#include "config.h"

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"
#include "profile.h"

#include "../chunk/chunk_bh.h"
#include "../chunk/chunk_cleanup.h"
#include "lich_ctx.h"
#include "../task/recovery.h"
#include "dbg.h"
#include "volume_proto_eclog.h"
#include "volume_ctl.h"

#define ENABLE_TABLE2_INSERT_PARALLEL 1
#define TABLE2_INSERT_MAX_TASK  10


STATIC int __table2_chunk_localize(table2_t *table2, const chkid_t *chkid, void *volume_proto);
STATIC int __table2_load_l2_chunk(table2_t *table2, const chkid_t *chkid);
STATIC int __table2_chunk_check__(table2_t *table2, const chkid_t *chkid, int op, int *oflags);

/**
 * lock3.1 for each subvol, must get table1 lock(lock2);
 */
// {{
STATIC int __table2_wrlock_sub(table2_t *table2, const chkid_t *tid)
{
        int ret;

        ret = ltable_wrlock(&table2->rwlock_sub, tid->idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (table2->ltime == 0) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        return 0;

err_lock:
        ltable_unlock(&table2->rwlock_sub, tid->idx);
err_ret:
        return ret;
}

STATIC int __table2_rdlock_sub(table2_t *table2, const chkid_t *tid)
{
        int ret;

        ret = ltable_rdlock(&table2->rwlock_sub, tid->idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (unlikely(table2->ltime == 0)) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        return 0;

err_lock:
        ltable_unlock(&table2->rwlock_sub, tid->idx);
err_ret:
        return ret;
}

STATIC void __table2_unlock_sub(table2_t *table2, const chkid_t *tid)
{
        int ret;

        ret = ltable_unlock(&table2->rwlock_sub, tid->idx);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
}
//}}

/**
 * lock3.2 for each raw
 */
// {{
STATIC int __table2_wrlock(table2_t *table2, const chkid_t *chkid)
{
        int ret;
        table1_t *table1;

        table1 = table2->table1;
        ret = table1->rdlock(table1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ltable_wrlock(&table2->rwlock_table, chkid->idx);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (table2->ltime == 0) {
                ret = EAGAIN;
                GOTO(err_lock1, ret);
        }

        return 0;

err_lock1:
        ltable_unlock(&table2->rwlock_table, chkid->idx);
err_lock:
        table1->unlock(table1);
err_ret:
        return ret;
}

STATIC int __table2_rdlock(table2_t *table2, const chkid_t *chkid)
{
        int ret;
        table1_t *table1;

        table1 = table2->table1;
        ret = table1->rdlock(table1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ltable_rdlock(&table2->rwlock_table, chkid->idx);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (unlikely(table2->ltime == 0)) {
                ret = EAGAIN;
                GOTO(err_lock1, ret);
        }

        return 0;

err_lock1:
        ltable_unlock(&table2->rwlock_table, chkid->idx);
err_lock:
        table1->unlock(table1);
err_ret:
        return ret;
}

STATIC void __table2_unlock(table2_t *table2, const chkid_t *chkid)
{
        int ret;
        table1_t *table1;

        table1 = table2->table1;

        ret = ltable_unlock(&table2->rwlock_table, chkid->idx);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        table1->unlock(table1);
}
//}}

int table2_ec_wrlock(table2_t *table2, const chkid_t *chkid)
{
        int ret;

        ret = ltable_wrlock(&table2->eclock_table, chkid->idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (table2->ltime == 0) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        return 0;
err_lock:
        ltable_unlock(&table2->eclock_table, chkid->idx);
err_ret:
        return ret;
}

int table2_ec_rdlock(table2_t *table2, const chkid_t *chkid)
{
        int ret;

        ret = ltable_rdlock(&table2->eclock_table, chkid->idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (unlikely(table2->ltime == 0)) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        return 0;
err_lock:
        ltable_unlock(&table2->eclock_table, chkid->idx);
err_ret:
        return ret;
}

void table2_ec_unlock(table2_t *table2, const chkid_t *chkid)
{
        int ret;

        ret = ltable_unlock(&table2->eclock_table, chkid->idx);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
}

STATIC int __table2_chunk_getinfo_noload__(table2_t *table2, const chkid_t *chkid,
                              chkinfo_t **_chkinfo, chkstat_t **_chkstat)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        YASSERT(chkid->type == __RAW_CHUNK__);

        ret = plock_rdlock(&table2->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (unlikely(chkid->idx >= table2->chknum)) {
                ret = ENOENT;
                goto err_lock;
        }

        chkinfo = table2->chunk_array[chkid->idx];
        chkstat = table2->chkstat_array[chkid->idx];

        if (unlikely(chkinfo == NULL)) {
                ret = ENOENT;
                goto err_lock;
        }

        plock_unlock(&table2->rwlock);

        *_chkinfo = chkinfo;
        *_chkstat = chkstat;

        return 0;
err_lock:
        plock_unlock(&table2->rwlock);
err_ret:
        return ret;
}

STATIC int IO_FUNC __table2_chunk_getinfo__(table2_t *table2, const chkid_t *chkid,
                                    chkinfo_t **_chkinfo, chkstat_t **_chkstat)
{
        int ret;

        ret = __table2_chunk_getinfo_noload__(table2, chkid, _chkinfo, _chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        // TODO 延迟加载L2 table proto对应的内容
                        ret = __table2_load_l2_chunk(table2, chkid);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }

                        ret = __table2_chunk_getinfo_noload__(table2, chkid, _chkinfo, _chkstat);
                        if (unlikely(ret)) {
                                if (ret == ENOENT) {
                                        goto err_ret;
                                } else {
                                        GOTO(err_ret, ret);
                                }
                        }
                } else {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_set_chunk__(table2_t *table2, chkinfo_t *chkinfo, chkstat_t *chkstat)
{
        int ret;
        const chkid_t *chkid = &chkinfo->id;

        YASSERT(chkid->type == __RAW_CHUNK__);

        //rdlock here, just protect chunk_array/chkstat_array;
        ret = plock_rdlock(&table2->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (chkid->idx >= table2->chknum) {
                ret = ENOENT;
                GOTO(err_lock, ret);
        }

        table2->chunk_array[chkid->idx] = chkinfo;
        table2->chkstat_array[chkid->idx] = chkstat;

        plock_unlock(&table2->rwlock);

        return 0;
err_lock:
        plock_unlock(&table2->rwlock);
err_ret:
        return ret;
}

STATIC int __table2_chunk__(table2_t *table2, const chkid_t *chkid,
                            chkinfo_t *_chkinfo, chkstat_t *_chkstat)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        YASSERT(chkid->type == __RAW_CHUNK__);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret))
                goto err_ret;

        CHKINFO_CP(_chkinfo, chkinfo);
        if (_chkstat)
                CHKSTAT_CP(_chkstat, chkstat, chkinfo->repnum);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_extend(table2_t *table2, int chunk_idx, int op)
{
        int ret;
        if (chunk_idx >= (int)table2->chknum) {
                if (op == __OP_READ) {
                        ret = ENOKEY;
                        goto err_ret;
                }
                
                DINFO(CHKID_FORMAT" table2:%p chunk_idx:%d chknum:%d\n",
                      CHKID_ARG(&table2->table1->chkid), table2, chunk_idx, (int)table2->chknum);

                ret = plock_wrlock(&table2->rwlock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (chunk_idx >= (int)table2->chknum) {//for thread safe
                        ret = yrealloc((void **)&table2->chunk_array,
                                       sizeof(chkinfo_t *) * table2->chknum,
                                       sizeof(chkinfo_t *) * (chunk_idx + 1));
                        if (unlikely(ret))
                                GOTO(err_lock, ret);

                        ret = yrealloc((void **)&table2->chkstat_array,
                                       sizeof(chkstat_t *) * table2->chknum,
                                       sizeof(chkstat_t *) * (chunk_idx + 1));
                        if (unlikely(ret))
                                GOTO(err_lock, ret);

                        table2->chknum = (chunk_idx + 1);
                }

                plock_unlock(&table2->rwlock);
        }


        return 0;
err_lock:
        plock_unlock(&table2->rwlock);
err_ret:
        return ret;
}

/*
 * table_proto modify(insert, update, del) fail must reset ltime.
 */
// {{
STATIC int __table_insert(table_proto_t *table_proto,
                int idx, const char *item, int len, volume_proto_t *volume_proto)
{
        int ret;

        ANALYSIS_BEGIN(0);

        ret = table_proto->insert(table_proto, idx, item, len);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        DWARN(CHKID_FORMAT, CHKID_ARG(&table_proto->chkid));
                        UNIMPLEMENTED(__DUMP__);
                } else {
                        DWARN("reset table "CHKID_FORMAT" insert %d ret:%d\n",
                                        CHKID_ARG(&table_proto->chkid), idx, ret);

                        volume_proto->table2.ltime = 0;
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_QUEUE(0, IO_WARN, "table2_insert");

        return 0;
err_ret:
        return ret;
}

STATIC int __table_update(table_proto_t *table_proto,
                int idx, const char *item, int len, volume_proto_t *volume_proto)
{
        int ret;

        ret = table_proto->update(table_proto, idx, item, len);
        if (unlikely(ret)) {
                DWARN("reset table "CHKID_FORMAT" update %d ret:%d \n",
                      CHKID_ARG(&table_proto->chkid), idx, ret);

                volume_proto->table2.ltime = 0;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_del(table_proto_t *table_proto,
                int idx, volume_proto_t *volume_proto)
{
        int ret;

        ret = table_proto->del(table_proto, idx);
        if (unlikely(ret)) {
                DWARN("reset table "CHKID_FORMAT" del %d ret:%d \n",
                      CHKID_ARG(&table_proto->chkid), idx, ret);

                volume_proto->table2.ltime = 0;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/*
 * batch_del fail need reset ltime too. but not implement this time.
 * someone who free remember implement this function.
 */
// }}

int table2_iterator_callback(const void *value, int loc, int idx, void *ctx)
{
        int ret;
        volume_proto_t *volume_proto;
        chkinfo_t *chkinfo;
        const chkinfo_t *tmp;
        chkstat_t *chkstat;
        table2_t *table2;

        (void) loc;
        (void) idx;

        volume_proto = ctx;
        table2 = &volume_proto->table2;
        tmp = value;

        if (tmp->id.id != volume_proto->chkid.id || tmp->id.type != __RAW_CHUNK__) {
                DERROR("load volume "CHKID_FORMAT"[%d] fail\n",
                       CHKID_ARG(&volume_proto->chkid), idx);
                ret = EIO;
                GOTO(err_ret, ret);
        }

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_extend(table2, tmp->id.idx, __OP_WRITE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&chkinfo, CHKINFO_SIZE(tmp->repnum));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&chkstat, CHKSTAT_SIZE(tmp->repnum));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memcpy(chkinfo, tmp, CHKINFO_SIZE(tmp->repnum));
        memset(chkstat, 0x0, CHKSTAT_SIZE(tmp->repnum));

        YASSERT(!chkid_isnull(&chkinfo->id));
        YASSERT(table2->chknum > chkinfo->id.idx);

        ret = __table2_set_chunk__(table2, chkinfo, chkstat);
        YASSERT(ret == 0);

        return 0;
err_ret:
        return ret;
}

/**
 * 所有改变到chkinfo和chkstat的操作，都需要接着调用本操作。
 *
 */
STATIC int __table2_update_item__(table2_t *table2, const chkinfo_t *_chkinfo,
                                  const chkstat_t *_chkstat, volume_proto_t *volume_proto)
{
        int ret, idx;
        table_proto_t *table_proto;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char item[FILE_PROTO_EXTERN_ITEM_SIZE];
        chkid_t tableid;
        const chkid_t *chkid;
        table1_t *table1;

        chkid = &_chkinfo->id;
        cid2tid(&tableid, chkid);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        // TODO core, table2->chknum == 0
        YASSERT(ret == 0);

        YASSERT(chkinfo != _chkinfo);
        YASSERT(chkstat != _chkstat);
        if (likely(chkinfo->info_version == _chkinfo->info_version)) {
                memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
                goto out;
        }

        table1 = &volume_proto->table1;
        ret = table1->get_table2_nolock(table1, &table_proto, &tableid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        DWARN("restart for safe, ret (%u) %s\n", ret, strerror(ret));
                        EXIT(ret);
                } else
                        UNIMPLEMENTED(__WARN__);
        }

        memcpy(item, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
        idx = chkid->idx  % FILE_PROTO_EXTERN_ITEM_COUNT;

        /* must check table1 chunk before update, like table1->chunk_check */
        ret = __table_update(table_proto, idx, item, CHKINFO_SIZE(_chkinfo->repnum), volume_proto);
        if (unlikely(ret)) {
#if ENABLE_CHUNK_DEBUG
                DWARN("update "CHKID_FORMAT" table idx:%d fail:%d\n", CHKID_ARG(chkid), idx, ret);
#endif
                GOTO(err_ret, ret);
        }

        chunk_cleanup_compare(table1->pool, &table1->chkid, chkinfo, _chkinfo);

        if (unlikely(chkinfo->repnum != _chkinfo->repnum)) {
                ret = yrealloc((void **)&chkinfo, CHKINFO_SIZE(chkinfo->repnum),
                               CHKINFO_SIZE(_chkinfo->repnum));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                ret = yrealloc((void **)&chkstat, CHKSTAT_SIZE(chkinfo->repnum),
                               CHKSTAT_SIZE(_chkinfo->repnum));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                memcpy(chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
                memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
                ret = __table2_set_chunk__(table2, chkinfo, chkstat);
                YASSERT(ret == 0);
        } else {
                YASSERT(chkid_cmp(&chkinfo->id, &_chkinfo->id) == 0);
                memcpy(chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
                memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __table2_update_chkstat__(table2_t *table2, const chkinfo_t *_chkinfo,
                                  const chkstat_t *_chkstat, volume_proto_t *volume_proto)
{
        int ret;
        table_proto_t *table_proto;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        chkid_t tableid;
        const chkid_t *chkid;
        table1_t *table1;

        chkid = &_chkinfo->id;
        cid2tid(&tableid, chkid);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        YASSERT(ret == 0);

        if (likely(chkinfo->info_version == _chkinfo->info_version)) {
                memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
                goto out;
        }

        table1 = &volume_proto->table1;
        ret = table1->get_table2_nolock(table1, &table_proto, &tableid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        DWARN("restart for safe, ret (%u) %s\n", ret, strerror(ret));
                        EXIT(ret);
                } else
                        UNIMPLEMENTED(__WARN__);
        }

        if (unlikely(chkinfo->repnum != _chkinfo->repnum)) {
                ret = yrealloc((void **)&chkinfo, CHKINFO_SIZE(chkinfo->repnum),
                               CHKINFO_SIZE(_chkinfo->repnum));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                ret = yrealloc((void **)&chkstat, CHKSTAT_SIZE(chkinfo->repnum),
                               CHKSTAT_SIZE(_chkinfo->repnum));
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                memcpy(chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
                memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
                ret = __table2_set_chunk__(table2, chkinfo, chkstat);
                YASSERT(ret == 0);
        } else {
                YASSERT(chkid_cmp(&chkinfo->id, &_chkinfo->id) == 0);
                memcpy(chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
                memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
        }

out:
        return 0;
}

/**
 * @note _chkinfo在多个task之间共享，所以需要传入每个chunk自己的chkid
 *
 * @param table2
 * @param io_opt
 * @param chkid
 * @param _chkinfo
 * @param _chkstat
 * @return
 */
static int __table2_chunk_create_insert(table2_t *table2, io_opt_t *io_opt, const chkid_t *chkid,
                                        const chkinfo_t *_chkinfo, const chkstat_t *_chkstat)
{
        int ret, idx, repnum = _chkinfo->repnum;
        table_proto_t *table_proto;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char item[FILE_PROTO_EXTERN_ITEM_SIZE];
        volume_proto_t *volume_proto = table2->volume_proto;
        table1_t *table1;
        chkid_t tableid;

        (void) io_opt;

        ANALYSIS_BEGIN(0);

        table1 = &volume_proto->table1;
        // chkid = &_chkinfo->id;

        cid2tid(&tableid, chkid);
//retry:
        ret = table1->get_table2_nolock(table1, &table_proto, &tableid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        DWARN("restart for safe, ret (%u) %s\n", ret, strerror(ret));
                        EXIT(ret);
                }

                UNIMPLEMENTED(__DUMP__);
                YASSERT(ret != ENOENT);
        }

        YASSERT(table_proto->chkid.type == __VOLUME_SUB_CHUNK__);

        // TODO 写入64B, 对SSD不友好
        memcpy(item, _chkinfo, CHKINFO_SIZE(repnum));

        chkinfo = (void *)item;
        chkinfo->id = *chkid;

        YASSERT(!chkid_isnull(chkid));
        // persistence info: table2 meta record
        idx = chkid->idx % FILE_PROTO_EXTERN_ITEM_COUNT;
        ret = __table_insert(table_proto, idx, item, CHKINFO_SIZE(repnum), volume_proto);
        if (unlikely(ret)) {
                YASSERT(ret != EEXIST);
                for (int i = 0; i < _chkinfo->repnum; i++) {
                        chunk_cleanup_push(table1->pool, &table1->chkid,
                                           chkid, &chkinfo->diskid[i].id,
                                           chkinfo->info_version);
                }

                GOTO(err_ret, ret);
        }

        // TODO MAX
        ret = ymalloc((void **)&chkinfo, CHKINFO_SIZE(repnum));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = ymalloc((void **)&chkstat, CHKSTAT_SIZE(repnum));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        memcpy(chkinfo, _chkinfo, CHKINFO_SIZE(repnum));
        memcpy(chkstat, _chkstat, CHKSTAT_SIZE(repnum));

        chkinfo->id = *chkid;
        ret = __table2_set_chunk__(table2, chkinfo, chkstat);
        YASSERT(ret == 0);

        ANALYSIS_QUEUE(0, IO_WARN, "table2_chunk_create");

        return 0;
err_ret:
        return ret;
}

#if ENABLE_TABLE2_INSERT_PARALLEL

typedef struct {
        table2_t *table2;
        io_opt_t *io_opt;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        const chkid_t *chkid;

        co_cond_t *cond;
        int *task_count;
        int retval;
} __table2_insert_ctx_t;

static void __table2_chunk_create_insert_wrapper(void *arg) {
        int ret;
        __table2_insert_ctx_t *ctx = arg;

        SCHEDULE_LEASE_SET();

        ANALYSIS_BEGIN(0);

        ret = __table2_chunk_create_insert(ctx->table2, ctx->io_opt, ctx->chkid, ctx->chkinfo, ctx->chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "__table2_chunk_create_insert_wrapper");

        DBUG("task %d\n", *ctx->task_count);

        YASSERT(*ctx->task_count > 0);

        (*ctx->task_count)--;
        co_cond_broadcast(ctx->cond, 0);
        yfree((void **)&ctx);
        return;
err_ret:
        (*ctx->task_count)--;
        co_cond_broadcast(ctx->cond, ret);
        yfree((void **)&ctx);
}

#endif

/*
static int __table2_chunk_create_insert(table2_t *table2, io_opt_t *io_opt,
                                        const chkinfo_t *_chkinfo, const chkstat_t *_chkstat,
                                        const buffer_t *initdata, volume_proto_t *volume_proto)
*/

static void __table2_chunk_force_cleanup(const char *pool, const volid_t *parent,
                                         const chkid_t *chkids, int count, const nid_t *nid, int repnum)
{
        for (int i = 0; i < count; i++) {
                for (int j = 0; j < repnum; j++) {
                        chunk_cleanup_push(pool, parent, &chkids[i], &nid[j], 0);
                }
        }
}

STATIC int __table2_chunk_create_multi__(table2_t *table2, io_opt_t *io_opt,
                                         const chkid_t *chkids, int chknum,
                                         const nid_t *nid, int repnum,
                                         chkinfo_t *chkinfo, chkstat_t *chkstat)
{
        int ret, i;
        int task_count = 0, err_count = 0;
        co_cond_t cond;
        __table2_insert_ctx_t *ctx;
        time_t begin = gettime();
        table1_t *table1 = table2->table1;

        co_cond_init(&cond);

        for (i = 0; i < chknum; i++) {
                while (task_count > TABLE2_INSERT_MAX_TASK) {
                        if (gettime() - begin > gloconf.rpc_timeout / 2) {
                                DWARN("create fail, finished %u recyle %u\n", i, chknum - i);
                                __table2_chunk_force_cleanup(table1->pool, &table1->chkid,
                                                             &chkids[i], chknum - i, nid, repnum);

                                while (task_count > 0) {
                                        ret = co_cond_wait2(&cond, __FUNCTION__);
                                        if (unlikely(ret)) {
                                                err_count++;
                                        }
                                }

                                ret = EAGAIN;
                                GOTO(err_ret, ret);
                        } else {
                                ret = co_cond_wait2(&cond, __FUNCTION__);
                                if (unlikely(ret)) {
                                        err_count++;
                                }
                        }
                }

                ret = ymalloc((void **)&ctx, sizeof(__table2_insert_ctx_t));
                if (unlikely(ret))
                        GOTO(err_cond, ret);

                memset(ctx, 0x0, sizeof(__table2_insert_ctx_t));

                // 共享数据
                ctx->table2 = table2;
                ctx->io_opt = io_opt;
                ctx->chkinfo = chkinfo;
                ctx->chkstat = chkstat;

                // 私有数据
                ctx->chkid = &chkids[i];
                YASSERT(!chkid_isnull(&chkids[i]));

                ctx->task_count = &task_count;
                ctx->cond = &cond;

                schedule_task_new(__FUNCTION__, __table2_chunk_create_insert_wrapper, ctx, -1);
                task_count++;
        }

        while (task_count > 0) {
                ret = co_cond_wait2(&cond, __FUNCTION__);
                if (unlikely(ret)) {
                        err_count++;
                }
        }

        if (err_count > 0) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
#if ENABLE_TABLE2_INSERT_PARALLEL
err_cond:
        while (task_count > 0) {
                co_cond_wait2(&cond, __FUNCTION__);
        }
#endif
err_ret:
        return ret;
}



STATIC int __table2_chunk_create_multi(table2_t *table2, io_opt_t *io_opt,
                                       const chkid_t *chkids, int chknum, const nid_t *nid, int repnum,
                                       const buffer_t *initdata, volume_proto_t *volume_proto)
{
        int ret, i;
        table1_t *table1;
        const fileinfo_t *fileinfo;
        chkstat_t *chkstat;
        chkinfo_t *chkinfo;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];

        DBUG("create "CHKID_FORMAT" count %u\n", CHKID_ARG(&chkids[0]), chknum);

        table1 = &volume_proto->table1;
        fileinfo = &table1->fileinfo;

        ANALYSIS_BEGIN(0);

        // persistence info：disk bitmap and sqlite record
        ret = volume_proto->chunk_ops.create(table1->pool, chkids, chknum, nid, repnum,
                                             &volume_proto->chkid, net_getnid(),
                                             0, 0, initdata, &fileinfo->ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // TODO 如果后续过程失败，需要回收已分配空间

        ANALYSIS_QUEUE(0, IO_WARN, "__table2_chunk_create_multi");

        chkstat = (void *)_chkstat;
        chkinfo = (void *)_chkinfo;
        memset(chkstat, 0x0, CHKSTAT_SIZE(repnum));
        memset(chkinfo, 0x0, sizeof(*chkinfo));

        diskid2loc(chkinfo->diskid, nid, repnum);
        chkinfo->repnum = repnum;

        ANALYSIS_BEGIN(1);

        ret = volume_proto_renew(volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        if (chknum == 1) {
                for (i = 0; i < chknum; i++) {
                        chkinfo->id = chkids[i];
                        ret = __table2_chunk_create_insert(table2, io_opt, &chkids[i], chkinfo, chkstat);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        } else {
                ret = __table2_chunk_create_multi__(table2, io_opt, chkids, chknum, nid, repnum, chkinfo, chkstat);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(1, IO_WARN, "__table2_chunk_create_multi2");

        return 0;
err_ret:
        return ret;
}

/**
 * @todo 如果持久化信息部分更新成功，部分失败，会造成chunk check和chunk create逻辑错误，
 * 无法再次写入该chunk。
 *
 * 元数据更新顺序：
 * - disk bitmap
 * - sqlite
 * - meta
 *
 * @param table2
 * @param io_opt
 * @param _chkinfo
 * @param _chkstat
 * @param initdata
 * @param volume_proto
 * @return
 */
static int __table2_chunk_create____(table2_t *table2, io_opt_t *io_opt, const chkinfo_t *_chkinfo,
                                     const chkstat_t *_chkstat, const buffer_t *initdata, volume_proto_t *volume_proto)
{
        int ret, idx, i;
        table_proto_t *table_proto;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char item[FILE_PROTO_EXTERN_ITEM_SIZE];
        chkid_t tableid;
        const chkid_t *chkid;
        table1_t *table1;
        const fileinfo_t *fileinfo;
        nid_t nid[LICH_REPLICA_MAX];

        ANALYSIS_BEGIN(0);

        table1 = &volume_proto->table1;
        fileinfo = &table1->fileinfo;
        chkid = &_chkinfo->id;

        for (i = 0; i < _chkinfo->repnum; i++) {
                nid[i] = _chkinfo->diskid[i].id;
        }

        // persistence info：disk bitmap and sqlite record
        ret = volume_proto->chunk_ops.create(volume_proto->table1.pool, &_chkinfo->id, 1,
                                             nid, _chkinfo->repnum, &volume_proto->chkid,
                                             net_getnid(), 0, _chkinfo->info_version,
                                             initdata, &fileinfo->ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        cid2tid(&tableid, chkid);
//retry:
        ret = table1->get_table2_nolock(table1, &table_proto, &tableid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        DWARN("restart for safe, ret (%u) %s\n", ret, strerror(ret));
                        EXIT(ret);
                }

                UNIMPLEMENTED(__DUMP__);
                YASSERT(ret != ENOENT);
        }

        // TODO 模拟故障点

        YASSERT(table_proto->chkid.type == __VOLUME_SUB_CHUNK__);

        // TODO 写入64B, 对SSD不友好
        memcpy(item, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));

        // persistence info: table2 meta record
        idx = chkid->idx % FILE_PROTO_EXTERN_ITEM_COUNT;
        ret = __table_insert(table_proto, idx, item, CHKINFO_SIZE(_chkinfo->repnum), volume_proto);
        if (unlikely(ret)) {
                YASSERT(ret != EEXIST);
                GOTO(err_ret, ret);
        }

        // TODO MAX
        ret = ymalloc((void **)&chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = ymalloc((void **)&chkstat, CHKSTAT_SIZE(_chkinfo->repnum));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        memcpy(chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
        memcpy(chkstat, _chkstat, CHKSTAT_SIZE(_chkinfo->repnum));

        ret = __table2_set_chunk__(table2, chkinfo, chkstat);
        YASSERT(ret == 0);

        ANALYSIS_QUEUE(0, IO_WARN, "table2_chunk_create");

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_sync__(table2_t *table2, const chkid_t *chkid, int *oflags)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX], _vfm[VFM_SIZE(VFM_COUNT_MAX)];
        const fileinfo_t *fileinfo;
        volume_proto_t *volume_proto = table2->volume_proto;
        vfm_t *vfm;
        uint64_t clock;
 
        DBUG("sync "CHKID_FORMAT"\n", CHKID_ARG(chkid));
       
        ret = __table2_chunk_check__(table2, chkid, __OP_WRITE, oflags);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;

        ret = __table2_chunk__(table2, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vfm = (void *)_vfm;
        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        clock = vfm->clock;
        
        ANALYSIS_BEGIN(0);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(chkid->type == __RAW_CHUNK__);
        fileinfo = &volume_proto->table1.fileinfo;
        ret = volume_proto->chunk_ops.sync(table2->table1->pool, chkinfo, chkstat,
                                           vfm, &volume_proto->chkid, 1,
                                           &volume_proto->lease.token,
                                           &fileinfo->ec, oflags);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (clock != vfm->clock) {
                ret = table2->vfm_set(table2, chkid, &clock, vfm, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        ANALYSIS_END(0, 1000 * 1000, "chunk_proto_sync");

        ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_set__(table2_t *table2, const chkid_t *chkid,
                                const nid_t *nid, int status, volume_proto_t *volume_proto)
{
        int ret, seted;
        chkinfo_t *chkinfo, *tmp;
        chkstat_t *chkstat;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX], _tmp[CHKINFO_MAX];
        char tmp1[MAX_BUF_LEN], tmp2[MAX_BUF_LEN];

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __table2_chunk__(table2, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        tmp = (void *)_tmp;
        CHKINFO_CP(tmp, chkinfo);
        ret = chunk_proto_set(chkinfo, chkstat, nid, status, &seted);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (seted) {
                if (__S_CHECK == status) {
                        /**
                         * 拔盘后，会触发大量事件，为了性能，不持久化该状态
                         *
                         * @todo 如下情况，会导致信息丢失：
                         * - 重新加载了卷
                         * - 卷控制器发生切换
                         */

                        ret = __table2_update_chkstat__(table2, chkinfo, chkstat, volume_proto);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        }

        CHKINFO_STR(tmp, tmp1);
        CHKINFO_STR(chkinfo, tmp2);
/*
        if (tmp->info_version != chkinfo->info_version) {
                DINFO("%s  --> %s\n", tmp1, tmp2);
        }
*/
        return 0;
err_ret:
        return ret;
}

void table2_destroy(table2_t *table2)
{
        int i;

        if (table2 == NULL)
                return ;

        if (table2->chunk_array) {
                for (i = 0; i < (int)table2->chknum; i++) {
                        if (table2->chunk_array[i]) {
                                YASSERT(table2->chkstat_array[i]);

                                yfree((void **)&table2->chunk_array[i]);
                                yfree((void **)&table2->chkstat_array[i]);
                        }
                }

                yfree((void **)&table2->chunk_array);
                yfree((void **)&table2->chkstat_array);

                table2->chunk_array = NULL;
                table2->chkstat_array = NULL;
        }

        ltable_destroy(&table2->rwlock_sub);
        ltable_destroy(&table2->rwlock_table);
        ltable_destroy(&table2->eclock_table);

        table2->chknum = 0;
}

int table2_cleanup(table2_t *table2)
{
        int ret, i;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        volume_proto_t *volume_proto = table2->volume_proto;

        if (table2->chunk_array) {
                for (i = 0; i < (int)table2->chknum; i++) {
                        if (table2->chunk_array[i]) {
                                YASSERT(table2->chkstat_array[i]);

                                chkinfo = table2->chunk_array[i];
                                chkstat = table2->chkstat_array[i];
                                ret = volume_proto->chunk_ops.unlink(chkinfo, chkstat);
                                if (unlikely(ret)) {
                                        if (ret == ENOENT)
                                                continue;
                                        else
                                                GOTO(err_ret, ret);
                                }
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_check__(table2_t *table2, const chkid_t *chkid, int op, int *oflags)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkinfo[CHKINFO_MAX], _chkstat[CHKSTAT_MAX], _vfm[VFM_SIZE(VFM_COUNT_MAX)];
        const fileinfo_t *fileinfo;
        vfm_t *vfm;
        uint64_t clock;
        volume_proto_t *volume_proto = table2->volume_proto;

        DBUG("chunk "CHKID_FORMAT" check\n", CHKID_ARG(chkid));
        
        YASSERT(chkid->type == __RAW_CHUNK__);

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __table2_chunk__(table2, chkid, chkinfo, chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_ret;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        vfm = (void *)_vfm;
        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        clock = vfm->clock;

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        fileinfo = &volume_proto->table1.fileinfo;
        ret = volume_proto->chunk_ops.check(table2->table1->pool, chkinfo,
                                            chkstat, vfm, &volume_proto->chkid, 1,
                                            &volume_proto->lease.token, &fileinfo->ec, oflags);
        if (unlikely(ret)) {
                DBUG("chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));
                GOTO(err_ret, ret);
        }

        if (clock != vfm->clock) {
                ret = table2->vfm_set(table2, chkid, &clock, vfm, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

typedef struct {
        chkid_t tid;
} args_t;
        

static void __table2_vfm_selfcheck__(void *_args)
{
        int ret, retry;
        args_t *args = _args;
        volid_t volid, chkid, *tid;
        uint32_t min, max, i;

        tid = &args->tid;
        volid.id = tid->id;
        volid.idx = 0;
        volid.type = __VOLUME_CHUNK__;

        chkid = *tid;
        chkid.type = __RAW_CHUNK__;

        min = tid->idx * FILE_PROTO_EXTERN_ITEM_COUNT;
        max = (tid->idx + 1) * FILE_PROTO_EXTERN_ITEM_COUNT;

        DINFO(CHKID_FORMAT" self check\n", CHKID_ARG(tid));
        
        for (i = min; i < max; i++) {
                chkid.idx = i;
                retry = 0;
        retry:
                ret = volume_ctl_chunk_check(&volid, &chkid);
                if (ret) {
                        if (ret == ENOENT)
                                continue;
                        else if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry, retry, 10, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }
        }

        DINFO(CHKID_FORMAT" self check success\n", CHKID_ARG(tid));

        ret = volume_ctl_vfm_cleanup(&volid, tid);
        if (unlikely(ret)) {
                DINFO("cleaunup "CHKID_FORMAT" fail\n", CHKID_ARG(tid));
                GOTO(err_ret, ret);
        }

        DINFO("cleaunup "CHKID_FORMAT" success\n", CHKID_ARG(tid));

        yfree((void **)&_args);
        
        return;
err_ret:
        DINFO(CHKID_FORMAT" self check fail\n", CHKID_ARG(tid));
        yfree((void **)&_args);
        return;
}

static void __table2_vfm_selfcheck(const chkid_t *tid)
{
        int ret;
        args_t *args;

#if 1
        return;
#endif
        
        ret = ymalloc((void **)&args, sizeof(*args));
        if (ret)
                UNIMPLEMENTED(__DUMP__);

        args->tid = *tid;

        schedule_task_new("table2_selfcheck", __table2_vfm_selfcheck__, args, -1);
}

STATIC int __table2_chunk_vfm_add__(table2_t *table2, const chkid_t *chkid, const nid_t *nid, int count)
{
        int ret, i;
        char _vfm[VFM_SIZE(VFM_COUNT_MAX)];
        vfm_t *vfm;
        uint64_t clock;
        volume_proto_t *volume_proto = table2->volume_proto;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        DBUG("chunk "CHKID_FORMAT" vfm add %s\n", CHKID_ARG(chkid), network_rname(nid));

        YASSERT(chkid->type == __RAW_CHUNK__);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vfm = (void *)_vfm;
        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        clock = vfm->clock;

        for (i = 0; i < count; i++) {
                ret = vfm_add_check(vfm, &nid[i], chkinfo);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        DINFO("chunk "CHKID_FORMAT" vfm add %s\n", CHKID_ARG(chkid), network_rname(nid));
        
        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (clock != vfm->clock) {
                ret = table2->vfm_set(table2, chkid, &clock, vfm, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                chkid_t tid;
                cid2tid(&tid, chkid);
                __table2_vfm_selfcheck(&tid);
        }

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_newinfo(const char *pool, const chkid_t *chkid,
                                  int repnum, int localize, nid_t *diskid, int *nid_count)
{
        int ret, flag;
        int count, left, repmin;

        (void) chkid;

        ANALYSIS_BEGIN(0);

        flag = __NEWDISK_BALANCE__;

        count = repnum;
        YASSERT(repnum <= LICH_REPLICA_MAX);

        if (localize && diskmd_pool_writeable(pool, 0) && !cluster_is_solomode()) {
                DBUG("localhost replica\n");
                diskid[0] = *net_getnid();

                left = count - 1;
                if (left) {
                        ret = dispatch_newdisk(&diskid[1], &left, left, pool, diskid, count, flag);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                count = left + 1;
        } else {
                DBUG("localhost closed\n");

                repmin = cluster_is_solomode() ? 1 : count;
                ret = dispatch_newdisk(diskid, &count, repmin, pool, NULL, 0, flag);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (repnum != (int)count && !cluster_is_solomode()) {
                ret = ENOSPC;
                GOTO(err_ret, ret);
        }

        *nid_count = count;
#if 0
        diskid2loc(chkinfo->diskid, diskid, count);

        chkinfo->id = *chkid;
        chkinfo->repnum = count;
        YASSERT(count);
#endif

        ANALYSIS_QUEUE(0, IO_WARN, "table2_chunk_newinfo");

        //CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_create_nolock(table2_t *table2, const chkid_t *chkid,
                                        int chknum, io_opt_t *io_opt,
                                        volume_proto_t *volume_proto)
{
        int ret, count, need, repnum;
        buffer_t _buf, *buf;
        nid_t nid[LICH_REPLICA_MAX];
        ec_t *ec;

        DBUG("create "CHKID_FORMAT" localize %u\n", CHKID_ARG(chkid), io_opt->localize);
        
        ANALYSIS_BEGIN(0);

        ec = &volume_proto->table1.fileinfo.ec;
#if ENABLE_EC
        if (EC_ISEC(ec) && eclog_chunk_islog(chkid, ec)) {
                need = ec->m - ec->k + 1;
        } else {
                need = volume_proto->table1.fileinfo.repnum_usr;
        }
#else
        (void) ec;
        need = volume_proto->table1.fileinfo.repnum_usr;
#endif
        count = need;
        YASSERT(count);
        while (1) {
                ret = __table2_chunk_newinfo(volume_proto->table1.pool, chkid,
                                             count, io_opt->localize, nid, &repnum);
                if (unlikely(ret)) {
#if ENABLE_REPLICA_FALLBACK
                        if (ret == ENOSPC && count > LICH_REPLICA_MIN) {
                                count--;
                                DERROR(""CHKID_FORMAT" repnum %d\n",
                                       CHKID_ARG(&table2->table1->chkid), count);
                                continue;
                        } else {
                                GOTO(err_ret, ret);
                        }
#else
                        if (cluster_is_solomode()) {
                                if (ret == ENOSPC && count > LICH_REPLICA_MIN) {
                                        count--;
                                        DERROR(""CHKID_FORMAT" repnum %d\n",
                                                        CHKID_ARG(&table2->table1->chkid), count);
                                        continue;
                                } else {
                                        GOTO(err_ret, ret);
                                }
                        } else
                                GOTO(err_ret, ret);
#endif
                }

                break;
        }

#if ENABLE_EC
        if (EC_ISEC(ec)) {
                if (count != need) {
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }
        }
#endif

        if (io_opt->fill) {
                buf = &_buf;
                mbuffer_init(buf, 0);
                mbuffer_appendzero(buf, LICH_CHUNK_SPLIT);
        } else {
                buf = NULL;
        }

        ret = __table2_chunk_create_multi(table2, io_opt, chkid, chknum,
                                          nid, repnum, buf, volume_proto);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        if (buf)
                mbuffer_free(buf);

        ANALYSIS_QUEUE(0, IO_WARN, "table2_chunk_create");

        return 0;
err_free:
        if (buf)
                mbuffer_free(buf);
err_ret:
        return ret;
}

STATIC int __table2_chunk_create_wrlocksub(table2_t *table2, const chkid_t *chkid,
                                        int chknum, io_opt_t *io_opt,
                                        volume_proto_t *volume_proto)
{
        int ret, i, start, end, off = 0, len, left = chknum;
        chkid_t tid, tmp;

        if (chknum == 1) {
                cid2tid(&tid, chkid);

                ret = __table2_wrlock_sub(table2, &tid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __table2_chunk_create_nolock(table2, chkid, chknum, io_opt, volume_proto);
                if (unlikely(ret)) {
                        GOTO(err_lock, ret);
                }

                __table2_unlock_sub(table2, &tid);
        } else {
                start = chkid->idx / FILE_PROTO_EXTERN_ITEM_COUNT;
                end = chkid[chknum - 1].idx / FILE_PROTO_EXTERN_ITEM_COUNT;

                YASSERT(chkid_ordered(chkid, chknum));

                for (i = start; i <= end; i++) {
                        tmp = *(chkid + off);
                        cid2tid(&tid, &tmp);

                        len = chkid_sametid(chkid + off, left, &tid);
                        YASSERT(len);

                        DBUG("lock "CHKID_FORMAT" tid "CHKID_FORMAT" off:%d len:%d left:%d\n",
                                        CHKID_ARG(&tmp), CHKID_ARG(&tid), off, len, left);

                        ret = __table2_wrlock_sub(table2, &tid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = __table2_chunk_create_nolock(table2, chkid + off, len, io_opt, volume_proto);
                        if (unlikely(ret)) {
                                GOTO(err_lock, ret);
                        }

                        __table2_unlock_sub(table2, &tid);

                        off += len;
                        left -= len;
                }

                YASSERT(left == 0);
                YASSERT(off == chknum);
        }

        return 0;
err_lock:
        __table2_unlock_sub(table2, &tid);
err_ret:
        return ret;

}

STATIC int __table2_localized__(const chkinfo_t *chkinfo)
{
        int i;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (net_islocal(&chkinfo->diskid[i].id)) {
                        return 1;
                }
        }

        return 0;
}

STATIC int __table2_chunk_stat__(table2_t *table2, const chkid_t *chkid,
                                 const uint64_t *snap_rollback, int *localized, int *rollback)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        // TODO 与flat过程锁冲突，都是chunk级加锁，有可能形成交错执行的情况，导致stat timeout
        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_lock;
                } else {
                        GOTO(err_lock, ret);
                }
        }

        *localized = __table2_localized__(chkinfo);

        if (snap_rollback) {
                *rollback = (*snap_rollback == chkinfo->snap_version) ? 0 : 1;
        } else {
                *rollback = 0;
        }

        __table2_unlock(table2, chkid);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}

STATIC int __table2_pre_io(table2_t *table2, const chkid_t *chkid,
                           chkinfo_t *_chkinfo, chkstat_t *_chkstat, vfm_t *vfm, uint64_t *clock,
                           int op)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        if (likely(vfm)) {
                ret = table2->vfm_get(table2, chkid, vfm);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

#if 1
        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ANALYSIS_BEGIN(0);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        chunk_proto_clock(chkinfo, chkstat, clock, op);
        CHKINFO_CP(_chkinfo, chkinfo);
        CHKSTAT_CP(_chkstat, chkstat, chkinfo->repnum);

        __table2_unlock(table2, chkid);
#else
        if (op == __OP_WRITE) {
                ret = __table2_wrlock(table2, chkid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                chunk_proto_clock(chkinfo, chkstat, clock, op);
                CHKINFO_CP(_chkinfo, chkinfo);
                CHKSTAT_CP(_chkstat, chkstat, chkinfo->repnum);

                __table2_unlock(table2, chkid);
        } else {
                ret = __table2_rdlock(table2, chkid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                chunk_proto_clock(chkinfo, chkstat, clock, op);
                CHKINFO_CP(_chkinfo, chkinfo);
                CHKSTAT_CP(_chkstat, chkstat, chkinfo->repnum);

                __table2_unlock(table2, chkid);
        }
#endif

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_reset(table2_t *table2, const chkid_t *chkid)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        ANALYSIS_BEGIN(0);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        chunk_proto_reset(chkinfo, chkstat);

        __table2_unlock(table2, chkid);
        
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return ret;
}

STATIC int __table2_post_io(table2_t *table2, const chkid_t *chkid,
                            const chkinfo_t *_chkinfo, const chkstat_t *_chkstat,
                            uint64_t clock)
{
        int ret, i;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        
        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        /*
         * io write 从pre_io 到 post_io 中间过程不能出现任何问题
         * 如果出现chkstat重新加载 就可能出现io的clock 大于 chkstat中的 clock
         */
        YASSERT(_chkstat->magic != 0);
        //此时有可能table2重新加载　chkstat->magic chkstat->clock 可能都是 0
        if (_chkstat->magic != chkstat->magic) {
                ret = EAGAIN;
                DBUG("clock1 %ju magic1 %u clock2 %ju magic2 %u\n",
                      chkstat->chkstat_clock,
                      chkstat->magic,
                      clock,
                      _chkstat->magic);
                GOTO(err_lock, ret);
        }

        YASSERT(chkstat->chkstat_clock >= clock);

        if (unlikely(chkinfo->info_version != _chkinfo->info_version)) {
                DBUG(CHKID_FORMAT" postio check fail %ld %ld\n", CHKID_ARG(chkid),
                      chkinfo->info_version, _chkinfo->info_version);
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (unlikely(chkstat->repstat[i].ltime != _chkstat->repstat[i].ltime)) {
                        DBUG(CHKID_FORMAT" postio check fail\n", CHKID_ARG(chkid));
                        ret = EAGAIN;
                        GOTO(err_lock, ret);
                }
        }

        __table2_unlock(table2, chkid);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}

static int __table2_is_l2_chunk_needload(table2_t *table2, const chkid_t *chkid)
{
        int ret;
        table_proto_t *table_proto = NULL;
        table1_t *table1;

        table1 = table2->table1;

        YASSERT(table1->table_count > chkid->idx);
        table_proto = table1->table_array[chkid->idx];

        ret = (table_proto != NULL && table_proto->item_loaded == 0);

        return ret;
}

static int __table2_pre_load(table_proto_t *table_proto, table1_t *table1)
{
        int ret;
        char item[FILE_PROTO_ITEM_SIZE];
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkinfo[CHKINFO_MAX], _chkstat[CHKSTAT_MAX];
        table_proto_t *parent;
        volume_proto_t *volume_proto = table1->volume_proto;
        const fileinfo_t *fileinfo;

#if 1
        /**
         * this function maybe change the subvol chunk info. but never get __table1_wrlock, reference table1->chunk_check
         * and we already modify __table2_chunk_getinfo function always exec __table2_check first
         * by the previous bug fix for lazy load(commit 65d1ba7b74a10191636793c76934bd897aa45ca3)
         * so, this function should not be called.
         */
        return 0;
#endif

        parent = table1->table_proto;
        fileinfo = &table1->fileinfo;
        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;

        CHKINFO_CP(chkinfo, table_proto->chkinfo);
        CHKSTAT_CP(chkstat, table_proto->chkstat, chkinfo->repnum);

        YASSERT(chkinfo->id.type != __RAW_CHUNK__);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = volume_proto->chunk_ops.check(table1->pool, chkinfo, chkstat, NULL,
                                            &parent->chkinfo->id, 1,
                                            &volume_proto->lease.token,
                                            &fileinfo->ec, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_DUMP(chkinfo, D_INFO);

        if (table_proto->chkinfo->info_version != chkinfo->info_version)  {
                DINFO("chunk "CHKID_FORMAT" update\n", CHKID_ARG(&chkinfo->id));
                memcpy(item, chkinfo, CHKINFO_SIZE(chkinfo->repnum));
                ret = __table_update(parent, chkinfo->id.idx, item, CHKINFO_SIZE(chkinfo->repnum), volume_proto);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                YASSERT(table_proto->chkinfo->repnum == chkinfo->repnum);
        }

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_CP(table_proto->chkinfo, chkinfo);
        CHKSTAT_CP(table_proto->chkstat, chkstat, chkinfo->repnum);

        return 0;
err_ret:
        return ret;
}

static int __table2_vfm_load__(table2_t *table2, const chkid_t *tid)
{
        int ret, len, len1;
        char buf[LICH_BLOCK_SIZE], tmp[VFM_SIZE(VFM_COUNT_MAX)];
        table_proto_t *table_proto = table2->table1->table_array[tid->idx];
        vfm_t *vfm;

        if (table2->table1->vfm_array[tid->idx].vfm) {
                goto out;
        }

        YASSERT(table_proto);
        len = LICH_BLOCK_SIZE;
        ret = table_proto->getinfo(table_proto, buf, &len, TABLE_PROTO_INFO_VFM);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (len == 0) {
                DBUG("load "CHKID_FORMAT" vfm empty, %u\n",
                     CHKID_ARG(&table_proto->chkid), len);

                ret = ymalloc((void **)&vfm, VFM_SIZE(0));
                if (ret)
                        GOTO(err_ret, ret);

                vfm->count = 0;
                vfm->clock = 0;
                table2->table1->vfm_array[tid->idx].vfm = vfm;
                table2->table1->vfm_array[tid->idx].lock = 0;

                goto out;
        }

        base64_decode(buf, &len1, tmp);

        ret = ymalloc((void **)&vfm, len1);
        if (ret)
                GOTO(err_ret, ret);

        memcpy(vfm, tmp, len1);

        vfm_dump(vfm, buf);

        table2->table1->vfm_array[tid->idx].vfm = vfm;
        table2->table1->vfm_array[tid->idx].lock = 0;
        
        DINFO("load "CHKID_FORMAT" vfm %s\n",
              CHKID_ARG(&table_proto->chkid), buf);

out:
        return 0;
err_ret:
        return ret;
}

static int __table2_vfm_load(table2_t *table2, const chkid_t *tid)
{
        int ret;
        
        if (table2->table1->vfm_array[tid->idx].vfm) {
                goto out;
        }

        if (table2->table1->table_array[tid->idx] == NULL) {
                goto out;
        }
        
        ret = __table2_wrlock_sub(table2, tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_vfm_load__(table2, tid);
        if (unlikely(ret))
                GOTO(err_lock, ret);
        
        __table2_unlock_sub(table2, tid);

out:
        return 0;
err_lock:
        __table2_unlock_sub(table2, tid);
err_ret:
        return ret;
}        

STATIC int __table2_load_l2_chunk__(table2_t *table2, const chkid_t *tid)
{
        int ret;
        table_proto_t *table_proto = NULL;
        uint64_t info_version;

        YASSERT(table2->volume_proto != NULL);
        // table_proto == NULL subvol还没分配过
        ANALYSIS_BEGIN(0);

        table_proto = table2->table1->table_array[tid->idx];

        ret = __table2_pre_load(table_proto, table2->table1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        info_version = table_proto->chkinfo->info_version;
        ret = table_proto->iterator(table_proto, table2_iterator_callback,
                                    -1, table2->volume_proto);

        if (unlikely(ret)) {
                // table_proto_destroy(table_proto);
                DERROR("chkid %s ret %d\n", id2str(tid), ret);
                GOTO(err_ret, ret);
        }

        YASSERT(info_version == table_proto->chkinfo->info_version);

        ANALYSIS_QUEUE(0, 1000 * 1000, "__table2_load_l2_chunk__");

        DBUG("load subvol "CHKID_FORMAT" table_proto %p success\n",
              CHKID_ARG(tid), table_proto);

        return 0;
err_ret:
        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 3), NULL);
        return ret;
}

STATIC int __table2_load_l2_chunk(table2_t *table2, const chkid_t *chkid)
{
        int ret, check;
        chkid_t tid;

        cid2tid(&tid, chkid);

        ret = __table2_rdlock_sub(table2, &tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        check = __table2_is_l2_chunk_needload(table2, &tid);
        if (check) {
                __table2_unlock_sub(table2, &tid);

                ret = __table2_wrlock_sub(table2, &tid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                // double check
                check = __table2_is_l2_chunk_needload(table2, &tid);
                if (check) {
                        DBUG("chunk "CHKID_FORMAT" load\n", CHKID_ARG(&tid));
                        ret = __table2_load_l2_chunk__(table2, &tid);
                        if (unlikely(ret)) {
                                GOTO(err_lock, ret);
                        }

                        ret = __table2_vfm_load__(table2, &tid);
                        if (unlikely(ret))
                                GOTO(err_lock, ret);
                }
        }

        __table2_unlock_sub(table2, &tid);
        return 0;
err_lock:
        __table2_unlock_sub(table2, &tid);
err_ret:
        return ret;
}

STATIC int __table2_load_bmap(table2_t *table2, table_proto_t *table_proto)
{
        int ret;
        chkid_t *tid = &table_proto->chkid;

        YASSERT(tid->type == __VOLUME_SUB_CHUNK__);

        ret = __table2_wrlock_sub(table2, tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // double check
        if (__table2_is_l2_chunk_needload(table2, tid)) {

                /**
                 * cannot check table1 chunk here! must be got __table1_wrlock first.
                 * you can check table1 chunk in table1.c,but remember, you must got __table1_wrlock not __table1_rdlock.
                 */
                /*
                ret = __table2_pre_load(table_proto, table2->table1);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
                */

                ret = table_proto->load(table_proto);
                if (unlikely(ret)) {
                        GOTO(err_lock, ret);
                }
        }

        __table2_unlock_sub(table2, tid);
        return 0;
err_lock:
        __table2_unlock_sub(table2, tid);
err_ret:
        return ret;
}

STATIC int __table2_needcheck(table2_t *table2, const chkid_t *chkid, int *_check,
                              int *_localized, uint64_t *snap_version, volume_proto_t *volume_proto)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        vfm_t *vfm;
        char _vfm[VFM_SIZE(VFM_COUNT_MAX)];

        (void) volume_proto;

        YASSERT(chkid->type == __RAW_CHUNK__);

// retry:
        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT);
                GOTO(err_ret, ret);
        }

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_lock;
                else
                        GOTO(err_lock, ret);
        }

        vfm = (void *)_vfm;
        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        *_check = !chunk_proto_consistent(chkinfo, chkstat, vfm);
        if (_localized) {
                *_localized = __table2_localized__(chkinfo);
        }

        if (snap_version) {
                *snap_version = chkinfo->snap_version;
        }

        DBUG("chunk "CHKID_FORMAT" need check %u\n", CHKID_ARG(&chkinfo->id), *_check);
        
        __table2_unlock(table2, chkid);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}

#if ENABLE_EC
STATIC int __table2_ec_chunk_redo(table2_t *table2, const chkid_t *chkid, int op,
                volume_proto_t *volume_proto)
{
        int ret, redo, recovery;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkinfo[CHKINFO_MAX], _chkstat[CHKSTAT_MAX];
        const fileinfo_t *fileinfo;
        ec_t *ec;

        clockstat_t clockstat;
        clockstat_t clocks[EC_MMAX];
        unsigned char src_in_err[EC_MMAX] = {0};

        ec = &volume_proto->table1.fileinfo.ec;
        if (!EC_ISEC(ec) || eclog_chunk_islog(chkid, ec)) {
                goto out;
        }

        YASSERT(chkid->type == __RAW_CHUNK__);

        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __table2_chunk__(table2, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        fileinfo = &volume_proto->table1.fileinfo;
        ret = chunk_proto_ec_check_needredo(table2->table1->pool, chkinfo, chkstat, &volume_proto->chkid, op, 1,
                                fileinfo->priority, fileinfo->attr, &fileinfo->ec,
                                &clockstat, clocks, src_in_err, &redo, &recovery);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __table2_unlock(table2, chkid);

        if (redo) {
#if ECLOG_ENABLE
                ret = eclog_chunk_redo(volume_proto, chkinfo, chkstat,
                                &fileinfo->ec, clocks, src_in_err);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
#endif
        }

        if (recovery) {
                ret = __table2_wrlock(table2, chkid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = chunk_proto_ec_check_recovery(table2->table1->pool, chkinfo, chkstat, &volume_proto->chkid, op,
                                fileinfo->priority, fileinfo->attr, &fileinfo->ec,
                                &clockstat, clocks, src_in_err);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                __table2_unlock(table2, chkid);
        }

out:
        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}
#else
STATIC int __table2_ec_chunk_redo(table2_t *table2, const chkid_t *chkid, int op,
                volume_proto_t *volume_proto)
{
        (void) table2;
        (void) chkid;
        (void) op;
        (void) volume_proto;

        return 0;
}
#endif


/**
 *
 * @param table2
 * @param chkid cid
 * @param op
 * @param localize
 * @param snap_version
 * @param _volume_proto
 * @return
 */
STATIC int IO_FUNC __table2_chunk_check(table2_t *table2, const chkid_t *chkid, int op,
                                int localize, uint64_t *_snap_version, int *oflags)
{
        int ret, check = 0, localized;
        volume_proto_t *volume_proto = table2->volume_proto;
        table1_t *table1;
        fileid_t subvol;
        uint64_t snap_version;

        ANALYSIS_BEGIN(0);

        table1 = &volume_proto->table1;
        cid2tid(&subvol, chkid);

        ret = table1->extend(table1, subvol.idx);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT);                
                GOTO(err_ret, ret);
        }

        ret = __table2_extend(table2, chkid->idx, op);
        if (unlikely(ret)) {
                YASSERT(ret == ENOKEY);
        }

        ret = table1->chunk_check(table1, &subvol, op, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_needcheck(table2, chkid, &check, &localized, &snap_version, volume_proto);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_ret;
                else
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "__table2_chunk_check_0");

        DBUG("chkid %s snap_version %p check %d\n", id2str(chkid), &snap_version, check);

        if (unlikely(check)) {
                ret = __table2_ec_chunk_redo(table2, chkid, op, volume_proto);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ANALYSIS_BEGIN(1);

                ret = __table2_wrlock(table2, chkid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __table2_chunk_check__(table2, chkid, op, oflags);
                if (unlikely(ret)) {
                        GOTO(err_lock, ret);
                }

                __table2_unlock(table2, chkid);

                ANALYSIS_QUEUE(1, IO_WARN / 2, "__table2_chunk_check_1");
        }

        ANALYSIS_BEGIN(2);

        if (localize && !localized && diskmd_pool_writeable(table1->pool, 0)) {
#if 0
                ret = __table2_chunk_localize(table2, chkid, volume_proto);
                if (unlikely(ret)) {
                        if (ret == EAGAIN || ret == ENOSPC) {
                                DWARN("localize "CHKID_FORMAT" fail\n", CHKID_ARG(chkid));
                                goto out;
                        } else if (ret == EEXIST) {
                                goto out;
                        } else
                                GOTO(err_ret, ret);
                }
#else
                ret = chunk_bh_localize(&volume_proto->chkid, chkid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
#endif

                DBUG("localize "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        }

        if (_snap_version)
                *_snap_version = snap_version;
        
        ANALYSIS_QUEUE(2, IO_WARN, "__table2_chunk_check_2");
//out:
        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        if (ret == ENOENT) {
                DBUG("chunk %s op %d check %d localize %d\n", id2str(chkid), op, check, localize);
        }
        return ret;
}

STATIC int __table2_cleanup_intact(table2_t *table2, const chkid_t *chkid, int *intact, const vfm_t *vfm)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        DBUG("check "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        
        YASSERT(chkid->type == __RAW_CHUNK__);

#if 0
        ret = table2->chunk_check(table2, chkid, __OP_READ, 0, NULL, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_ret;
                } else {
                        GOTO(err_ret, ret);
                }
        }
#endif

        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_lock;
                } else {
                        GOTO(err_lock, ret);
                }
        }

#if 1
        int i, unintact;
        YASSERT(vfm);
        unintact = 0;
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (vfm_exist(vfm, &chkinfo->diskid[i].id)) {
                        unintact++;
                }
        }

        if (unintact) {
                *intact = chunk_proto_intact(chkinfo, chkstat);
        } else {
                DBUG("skip check "CHKID_FORMAT"\n", CHKID_ARG(chkid));
                *intact = 1;
        }
#else
        *intact = chunk_proto_intact(chkinfo, chkstat);
#endif

        __table2_unlock(table2, chkid);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}

STATIC int __table2_chunk_unintact(table2_t *table2, const fileid_t *fileid,
                                   func3_t func3, void *_arg, uint64_t idx, int deep)
{
        int ret, unintact;
        chkid_t chkid, _fileid;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        //chknum = table2->chknum;

        //DBUG("file "CHKID_FORMAT" count %u\n",
        //CHKID_ARG(fileid), chknum);

        fid2cid(&chkid, fileid, idx);

#if CHUNK_UNINTACT_VFM
        if (deep) {
                DBUG(CHKID_FORMAT" deep scan\n", CHKID_ARG(&chkid));
                ret = table2->chunk_check(table2, &chkid, __OP_READ, 0, NULL, NULL);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                goto err_ret;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }
        }

        char _vfm[VFM_SIZE(VFM_COUNT_MAX)];
        vfm_t *vfm;

        vfm = (void *)_vfm;
        ret = table2->vfm_get(table2, &chkid, vfm);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        vfm = NULL;
                } else
                        GOTO(err_ret, ret);
        }

#else
        ret = table2->chunk_check(table2, &chkid, __OP_READ, 0, NULL, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_ret;
                } else {
                        GOTO(err_ret, ret);
                }
        }
#endif

        ret = __table2_rdlock(table2, &chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_getinfo__(table2, &chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_lock;
                } else
                        GOTO(err_lock, ret);
        }

#if CHUNK_UNINTACT_VFM
        int i;
        YASSERT(vfm);
        unintact = 0;
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (vfm_exist(vfm, &chkinfo->diskid[i].id)) {
                        unintact++;
                }
        }

        _fileid = *fileid;
        func3(_arg, &_fileid, chkinfo, &unintact);
        
#else
        unintact = chunk_proto_unintact(chkinfo, chkstat);
        if (unintact) {
                _fileid = *fileid;
                func3(_arg, &_fileid, chkinfo, &unintact);
        }
#endif

        DBUG("check "CHKID_FORMAT" unintact %u\n", CHKID_ARG(&chkid), unintact);
        
        __table2_unlock(table2, &chkid);

        return 0;
err_lock:
        __table2_unlock(table2, &chkid);
err_ret:
        return ret;
}

STATIC int __table2_vfm_cleanup__(table2_t *table2, const chkid_t *tid, uint32_t from, uint32_t to, const vfm_t *vfm)
{
        int ret, intact;
        uint32_t i;
        chkid_t tmp;

        ANALYSIS_BEGIN(0);
        time_t begin = gettime();
        for (i = from; i < to; i++) {
                fid2cid(&tmp, tid, i);
                ret = __table2_cleanup_intact(table2, &tmp, &intact, vfm);
                if (unlikely(ret)) {
                        if (ret == ENOKEY || ret == ENOENT)
                                continue;
                        else
                                GOTO(err_ret, ret);
                }

                if (intact == 0) {
                        ret = EBUSY;
                        GOTO(err_ret, ret);
                }

                if (gettime() - begin > gloconf.rpc_timeout / 2) {
                        ret = EBUSY;
                        DWARN("table "CHKID_FORMAT" vfm cleanup timeout, cur %u\n", CHKID_ARG(tid), i);
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_END(0, 1000 * 1000 * 2, NULL);

        return 0;
err_ret:
        return ret;
}


STATIC int __table2_vfm_cleanup_pre(table2_t *table2, const chkid_t *tid, uint32_t from, uint32_t to, const vfm_t *vfm)
{
        int ret, intact;
        uint32_t i;
        chkid_t tmp;

        ANALYSIS_BEGIN(0);
        time_t begin = gettime(), now;
        for (i = from; i < to; i++) {
                fid2cid(&tmp, tid, i);
                ret = __table2_cleanup_intact(table2, &tmp, &intact, vfm);
                if (unlikely(ret)) {
                        if (ret == ENOKEY || ret == ENOENT) {
                                continue;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                if (!intact) {
                        DWARN("chunk "CHKID_FORMAT" unintact\n", CHKID_ARG(&tmp));
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                }
                
                now = gettime();
                if (now - begin > gloconf.rpc_timeout / 2) {
                        ret = EBUSY;
                        DWARN("table "CHKID_FORMAT" from %u to %u vfm cleanup"
                              " timeout, cur %u, begin %u now %u\n",
                              CHKID_ARG(tid), from, to, i, begin, now);
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_END(0, 1000 * 1000 * 2, NULL);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_vfm_cleanup(table2_t *table2, const chkid_t *tid)
{
        int ret, retval;
        uint32_t from, to;
        chkid_t chkid;
        uint64_t pclock;
        vfm_t *vfm;
        char _vfm[VFM_SIZE(VFM_COUNT_MAX)];

        YASSERT(tid->type == __VOLUME_SUB_CHUNK__);

        from = tid->idx * FILE_PROTO_EXTERN_ITEM_COUNT;
        to = from + FILE_PROTO_EXTERN_ITEM_COUNT;
        chkid = *tid;
        chkid.idx = from;
        chkid.type = __RAW_CHUNK__;

        vfm = (void *)_vfm;
        ret = table2->vfm_get(table2, &chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (vfm->count == 0) {
                DBUG("table "CHKID_FORMAT" vfm clean\n", CHKID_ARG(tid));
                goto out;
        } else {
                char buf[MAX_BUF_LEN];
                vfm_dump(vfm, buf);
                DINFO("table "CHKID_FORMAT" vfm %s\n", CHKID_ARG(tid), buf);
        }

        ret = __table2_vfm_cleanup_pre(table2, tid, from, to, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = table2->vfm_lock(table2, &chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_vfm_cleanup__(table2, tid, from, to, vfm);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        pclock = vfm->clock;
        vfm->count = 0;
        vfm->clock++;
        ret = table2->vfm_set(table2, &chkid, &pclock, vfm, 1);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = table2->vfm_unlock(table2, &chkid);
        YASSERT(ret == 0);

        DINFO("table "CHKID_FORMAT" vfm cleanup\n", CHKID_ARG(tid));

out:
        return 0;
err_lock:
        retval = table2->vfm_unlock(table2, &chkid);
        YASSERT(retval == 0 || retval == EAGAIN);
err_ret:
        return ret;
}

STATIC int __table2_vfm_set_dangerously(table2_t *table2, const chkid_t *chkid, const vfm_t *_vfm)
{
        int ret;
        chkid_t tid;
        uint64_t pclock;
        vfm_t *vfm;
        char tmp[VFM_SIZE(VFM_COUNT_MAX)];

        cid2tid(&tid, chkid);

        vfm = (void *)tmp;
        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        char tmp1[MAX_BUF_LEN], tmp2[MAX_BUF_LEN];
        vfm_dump(vfm, tmp1);
        vfm_dump(_vfm, tmp2);
        
        DWARN("set table "CHKID_FORMAT" vfm from %s to %s \n", CHKID_ARG(&tid), tmp1, tmp2);

        ret = table2->vfm_lock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        pclock = vfm->clock;
        ret = table2->vfm_set(table2, chkid, &pclock, _vfm, 1);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = table2->vfm_unlock(table2, chkid);
        YASSERT(ret == 0);

        return 0;
err_lock:
        ret = table2->vfm_unlock(table2, chkid);
        YASSERT(ret == 0);
err_ret:
        return ret;
}

/**
 * @brief 检查对应的L2 table proto是否存在，如果不存在，创建或加载。
 *
 * @param table2
 * @param chkid
 * @param _volume_proto
 * @return
 */
STATIC int __table2_check(table2_t *table2, const chkid_t *chkid, void *_volume_proto, int create_if)
{
        int ret, retry = 0;
        chkid_t tableid;
        table1_t *table1;
        table_proto_t *table_proto;
        volume_proto_t *volume_proto = _volume_proto;

        cid2tid(&tableid, chkid);
        table1 = table2->table1;
retry:
        ret = table1->get_table2(table1, &table_proto, &tableid);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ESTALE) {
                        GOTO(err_ret, ret);
                }

                if(!create_if)
                        return ret;

                YASSERT(ret == ENOENT);
                ret = table1->create_table2(table1, &table_proto, &tableid, volume_proto);
                if (unlikely(ret)) {
                        if (ret == EEXIST) {
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        ret = table1->chunk_check(table1, &table_proto->chkid, __OP_WRITE, NULL);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        if (retry < 3) {
                                DWARN("check retry %u\n", retry);
                                usleep(100 * 1000);
                                retry++;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                } else {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_create_lock(table2_t *table2, const chkid_t *chkids,
                                int chknum, chkid_t *locked, int *_locked_count)
{
        int ret, locked_count = 0, i;
        table1_t *table1;
        const chkid_t *chkid;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        table1 = table2->table1;
        ret = table1->rdlock(table1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < chknum; i++) {
                chkid = &chkids[i];
                YASSERT(chkid->type == __RAW_CHUNK__);

                ret = ltable_trywrlock(&table2->rwlock_table, chkid->idx);
                if (unlikely(ret)) {
                        if (ret == EBUSY)
                                continue;
                        else
                                GOTO(err_lock, ret);
                }

                ret = __table2_extend(table2, chkid->idx, __OP_WRITE);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
                if (ret == 0) {
                        ltable_unlock(&table2->rwlock_table, chkid->idx);
                        continue;
                } else if (ret == ENOENT) {
                        // need create
                } else {
                        // get chunk info fail,wait for next time
                        ltable_unlock(&table2->rwlock_table, chkid->idx);
                        continue;
                }

                locked[locked_count] = chkids[i];
                locked_count++;
        }

        if (table2->ltime == 0) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        YASSERT(locked_count <= chknum);
        *_locked_count = locked_count;

        return 0;
err_lock:
        for (i = 0; i < locked_count; i++) {
                chkid = &locked[i];
                ltable_unlock(&table2->rwlock_table, chkid->idx);
        }
        table1->unlock(table1);
err_ret:
        return ret;
}

STATIC void __table2_create_unlock(table2_t *table2, const chkid_t *chkids, int chknum)
{
        int ret, i;
        table1_t *table1;
        const chkid_t *chkid;

        table1 = table2->table1;

        for (i = 0; i < chknum; i++) {
                chkid = &chkids[i];
                ret = ltable_unlock(&table2->rwlock_table, chkid->idx);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        }

        table1->unlock(table1);
}

/**
 * @todo 与__table2_chunk_create的不同在于后者传入了buf，基本逻辑雷同
 *
 * 本函数在读写IO之前调用，用于创建缺失的raw chunk
 * 后者在复制一个已存在的raw chunk时被调用
 *
 * 在延迟加载L2 chunk的实现中，两者都需要一个check and load的过程，
 * 用于load需要加载而还没有加载的L2 chunk（即迭代加载其子项）
 *
 * @param table2
 * @param chkid
 * @param io_opt
 * @param _volume_proto
 * @return
 */
STATIC int __table2_chunk_create_lock1(table2_t *table2, const chkid_t *chkid,
                                       io_opt_t *io_opt, void *_volume_proto)
{
        int ret, flag = 0;
        volume_proto_t *volume_proto = table2->volume_proto;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        (void) _volume_proto;

        ANALYSIS_BEGIN(0);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_extend(table2, chkid->idx, __OP_WRITE);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        ret = __table2_chunk_create_wrlocksub(table2, chkid, 1, io_opt, volume_proto);
                        if (unlikely(ret)) {
                                flag = 2;
                                GOTO(err_lock, ret);
                        }
                } else
                        GOTO(err_lock, ret);
        }

        __table2_unlock(table2, chkid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        if (EEXIST == ret) {
                DFATAL("chunk %s flag %d ret %d\n", id2str(chkid), flag, ret);
        }
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return ret;
}

STATIC int __table2_chunk_create_lock2(table2_t *table2, const chkid_t *chkid,
                                 int chknum, io_opt_t *io_opt, void *_volume_proto)
{
        int ret, flag = 0, locked_count;
        volume_proto_t *volume_proto = table2->volume_proto;
        chkid_t *locked;

        (void) _volume_proto;

        ret = ymalloc((void **)&locked, sizeof(*locked) * chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_create_lock(table2, chkid, chknum, locked, &locked_count);
        if (unlikely(ret))
                GOTO(err_free, ret);

        if (locked_count) {
                ret = __table2_chunk_create_wrlocksub(table2, locked, locked_count,
                                                   io_opt, volume_proto);
                // skip不能成功加锁的chunk
                if (unlikely(ret)) {
                        GOTO(err_lock, ret);
                }
        }

        __table2_create_unlock(table2, locked, locked_count);
        yfree((void **)&locked);

        return 0;
err_lock:
        __table2_create_unlock(table2, locked, locked_count);
err_free:
        yfree((void **)&locked);
err_ret:
        if (EEXIST == ret) {
                DFATAL("chunk %s flag %d ret %d\n", id2str(chkid), flag, ret);
        }
        return ret;
}

STATIC int __table2_chunk_create(table2_t *table2, const chkid_t *chkid,
                                 int chknum, io_opt_t *io_opt, void *_volume_proto)
{
        int ret, i;
        volume_proto_t *volume_proto = table2->volume_proto;

        DBUG("create "CHKID_FORMAT" localize %u\n", CHKID_ARG(chkid), io_opt->localize);
        
        for (i = 0; i < chknum; i++) {
                ret = __table2_check(table2, &chkid[i], _volume_proto, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (chknum == 1) {
                ret = __table2_chunk_create_lock1(table2, chkid, io_opt, volume_proto);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = __table2_chunk_create_lock2(table2, chkid, chknum, io_opt, volume_proto);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_discard__(table2_t *table2, const chkid_t *chkid, volume_proto_t *volume_proto)
{
        int ret, idx;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        table1_t *table1;
        chkid_t tableid;
        table_proto_t *table_proto;

        ANALYSIS_BEGIN(0);

        YASSERT(chkid->type == __RAW_CHUNK__);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        cid2tid(&tableid, chkid);
        table1 = &volume_proto->table1;
        ret = table1->get_table2_nolock(table1, &table_proto, &tableid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        DWARN("restart for safe, ret (%u) %s\n", ret, strerror(ret));
                        EXIT(ret);
                } else
                        UNIMPLEMENTED(__WARN__);
        }

        idx = chkid->idx % FILE_PROTO_EXTERN_ITEM_COUNT;
        ret = __table_del(table_proto, idx, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // TODO table2_destroy后会产生core
        // restart过程会触发各卷的flush过程
        chkinfo = table2->chunk_array[chkid->idx];
        chkstat = table2->chkstat_array[chkid->idx];

        ret = volume_proto->chunk_ops.unlink(chkinfo, chkstat);
        if (unlikely(ret)) {
                DERROR("discard "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        } else {
                DBUG("discard "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        }

        yfree((void **)&chkinfo);
        yfree((void **)&chkstat);

        table2->chunk_array[chkid->idx] = NULL;
        table2->chkstat_array[chkid->idx] = NULL;

        ANALYSIS_END(0, 1000 * 100, NULL);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_discard(table2_t *table2, const chkid_t *chkid,
                                  void *_volume_proto)
{
        int ret;
        volume_proto_t *volume_proto = _volume_proto;

        ANALYSIS_BEGIN(0);

        ret = __table2_check(table2, chkid, _volume_proto, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_discard__(table2, chkid, volume_proto);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __table2_unlock(table2, chkid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return ret;
}

static inline int __table2_chunk_batch_unlink(table2_t *table2, const chkid_t **chkid,
                                              chkinfo_t **chkinfo, chkstat_t **chkstat,
                                              int start_idx, int stop_idx)
{
        int ret;
        volume_proto_t *volume_proto = table2->volume_proto;

        for(int j=start_idx; j < stop_idx; j++) {
                ret = volume_proto->chunk_ops.unlink(chkinfo[j], chkstat[j]);
                if (unlikely(ret)) {
                        DERROR("discard "CHKID_FORMAT"\n", CHKID_ARG(chkid[j]));
                } else {
                        DBUG("discard "CHKID_FORMAT"\n", CHKID_ARG(chkid[j]));
                }

                yfree((void **)&chkinfo[j]);
                yfree((void **)&chkstat[j]);

                table2->chunk_array[chkid[j]->idx] = NULL;
                table2->chkstat_array[chkid[j]->idx] = NULL;
        }

        return 0;
}

STATIC int __table2_chunk_batch_discard__(table2_t *table2, const chkid_t **chkid, int n_chks)
{
        int ret = 0;
        chkinfo_t **chkinfo;
        chkstat_t **chkstat;
        volume_proto_t *volume_proto;
        table1_t *table1;
        chkid_t tableid, pre_tableid;
        table_proto_t *table_proto, *pre_table_proto;
        uint32_t *idx;
        int start_idx = 0;

        volume_proto = table2->volume_proto;
        table1 = &volume_proto->table1;

        ANALYSIS_BEGIN(0);

        idx = malloc(n_chks * sizeof(uint32_t));
        chkinfo = malloc(n_chks * sizeof(chkinfo_t *));
        chkstat = malloc(n_chks * sizeof(chkstat_t *));

        for (int i=0; i < n_chks; i++) {

                YASSERT(chkid[i]->type == __RAW_CHUNK__);

                // 批量提交连续的chunk
                // 发现不连续的, 或跨subvol，则提交，重新开始
                ret = __table2_chunk_getinfo__(table2, chkid[i], &chkinfo[i], &chkstat[i]);
                if (unlikely(ret)) {
                        //GOTO(skip, ret);//todo...

                        if (start_idx != -1 && i > start_idx) {
                                ret = pre_table_proto->batch_del(pre_table_proto, idx + start_idx, i - start_idx);
                                if (!ret) {
                                        ret = __table2_chunk_batch_unlink(table2, chkid, chkinfo, chkstat, start_idx, i);
                                        if (unlikely(ret)) {
                                                // pass
                                        }
                                }
                                else
                                        DERROR("batch delete chunks failed, err=%d\r\n", ret);
                        }

                        start_idx = -1;
                        continue;
                } else {
                        if (start_idx == -1)
                                start_idx = i;
                }

                cid2tid(&tableid, chkid[i]);

                ret = table1->get_table2_nolock(table1, &table_proto, &tableid);
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                DWARN("restart for safe, ret (%u) %s\n", ret, strerror(ret));
                                EXIT(ret);
                        } else
                                UNIMPLEMENTED(__WARN__);
                }

                idx[i] = chkid[i]->idx % FILE_PROTO_EXTERN_ITEM_COUNT;

                chkinfo[i] = table2->chunk_array[chkid[i]->idx];
                chkstat[i] = table2->chkstat_array[chkid[i]->idx];

                if (i > 0 && pre_tableid.idx != tableid.idx && start_idx != -1 && start_idx != i) {
                        // NOT include i
                        ret = pre_table_proto->batch_del(pre_table_proto, idx + start_idx, i - start_idx);
                        if (!ret) {
                                ret = __table2_chunk_batch_unlink(table2, chkid, chkinfo, chkstat, start_idx, i);
                                if (unlikely(ret)) {
                                        // pass
                                }
                        }
                        else
                                DERROR("batch delete chunks failed, err=%d\r\n", ret);

                        start_idx = i;
                }

                if (i == n_chks - 1 && start_idx != -1) {
                        YASSERT(i + 1 - start_idx > 0);

                        // include i
                        ret = table_proto->batch_del(table_proto, idx + start_idx, i + 1 - start_idx);
                        if (!ret) {
                                ret = __table2_chunk_batch_unlink(table2, chkid, chkinfo, chkstat, start_idx, i+1);
                                if (unlikely(ret)) {
                                        // pass
                                }
                        }
                        else
                                DERROR("batch delete chunks failed, err=%d\r\n", ret);
                }

                // ignore some discard error.

                pre_tableid = tableid;
                pre_table_proto = table_proto;
        }

//err_free:
        free(idx);
        free(chkinfo);
        free(chkstat);

        ANALYSIS_END(0, 1000 * 100, NULL);

        return 0;              //not an error if chunk not found.
//err_ret:
//       return ret;
}

STATIC int __table2_chunk_batch_discard(table2_t *table2, const chkid_t **chkid, int n_chks,
        void *_volume_proto)
{
        int ret;
        int i = 0;

        for (i=0; i < n_chks; i++){//todo.
                ret = __table2_check(table2, chkid[i], _volume_proto, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_BEGIN(0);

        for (i=0; i < n_chks; i++) {
                ret = __table2_wrlock(table2, chkid[i]);
                if (unlikely(ret)) {
                        while(i > 0)
                                __table2_unlock(table2, chkid[--i]);

                        GOTO(err_ret, ret);
                }
        }

        ret = __table2_chunk_batch_discard__(table2, chkid, n_chks);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "__table2_chunk_batch_discard");

        for (i=0; i < n_chks; i++) {
                __table2_unlock(table2, chkid[i]);
        }

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return 0;
err_lock:
        for(i=0;i<n_chks;i++) {
                __table2_unlock(table2, chkid[i]);
        }
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_chunk_createwith__(table2_t *table2, const chkid_t *chkid, io_opt_t *io_opt,
                                       const buffer_t *buf, volume_proto_t *volume_proto)
{
        int ret, count, repnum;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkinfo[CHKINFO_MAX], _chkstat[CHKSTAT_MAX];
        nid_t nid[LICH_REPLICA_MAX];

        ANALYSIS_BEGIN(0);

        YASSERT(chkid->type == __RAW_CHUNK__);

        ret = __table2_extend(table2, chkid->idx, __OP_WRITE);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret == 0)) {
                ret = EEXIST;
                GOTO(err_ret, ret);
        } else if (ret == ENOENT) {
#if 0
                // TODO 延迟加载L2 table proto对应的内容
                ret = __table2_load_l2_chunk(table2, chkid);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
                if (unlikely(ret == 0)) {
                        ret = EEXIST;
                        GOTO(err_ret, ret);
                }
#endif
        } else {
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        count = volume_proto->table1.fileinfo.repnum_usr;
        YASSERT(count);
        while (1) {
                ret = __table2_chunk_newinfo(volume_proto->table1.pool, chkid,
                                             count, io_opt->localize, nid, &repnum);
                if (unlikely(ret)) {
#if ENABLE_REPLICA_FALLBACK
                        if (ret == ENOSPC && count > LICH_REPLICA_MIN) {
                                count--;
                                continue;
                        } else {
                                GOTO(err_ret, ret);
                        }

#else
                        if (cluster_is_solomode()) {
                                if (ret == ENOSPC && count > LICH_REPLICA_MIN) {
                                        count--;
                                        continue;
                                } else {
                                        GOTO(err_ret, ret);
                                }
                        } else
                                GOTO(err_ret, ret);
#endif
                }

                break;
        }

        memset(chkinfo, 0x0, CHKINFO_SIZE(repnum));

        diskid2loc(chkinfo->diskid, nid, repnum);
        chkinfo->id = *chkid;
        chkinfo->repnum = repnum;
        YASSERT(repnum);

        memset(chkstat, 0x0, CHKSTAT_SIZE(chkinfo->repnum));

        ret = __table2_chunk_create____(table2, io_opt, chkinfo, chkstat, buf, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(chkinfo->repnum);

        ANALYSIS_END(0, 1000 * 100, NULL);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_createwith(table2_t *table2, const chkid_t *chkid, io_opt_t *io_opt,
                                     void *_volume_proto, const buffer_t *buf)
{
        int ret, retry = 0;
        volume_proto_t *volume_proto = _volume_proto;

        ANALYSIS_BEGIN(0);
retry:
        ret = __table2_check(table2, chkid, _volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // TODO 与stat过程锁冲突
        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_createwith__(table2, chkid, io_opt, buf, volume_proto);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        __table2_unlock(table2, chkid);
                        if (retry < 3) {
                                DWARN("create retry %u\n", retry);
                                usleep(100 * 1000);
                                retry++;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_lock, ret);
        }

        __table2_unlock(table2, chkid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return ret;
}

/**
 * @todo 延迟加载有无影响?
 */
STATIC int __table2_chunk_exist(table2_t *table2, const chkid_t *chkid, int *exist, void *volume_proto)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        (void) volume_proto;
        ret = __table2_check(table2, chkid, volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (ret == 0) {
                *exist = 1;
        } else {
                *exist = 0;
        }

        __table2_unlock(table2, chkid);

        return 0;
err_ret:
        return ret;
}


STATIC int __table2_chunk_sync(table2_t *table2, const chkid_t *chkid, int *oflags)
{
        int ret;
        volume_proto_t *volume_proto = table2->volume_proto;

        ANALYSIS_BEGIN(0);

        ret = __table2_check(table2, chkid, volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_ec_chunk_redo(table2, chkid, __OP_WRITE, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_sync__(table2, chkid, oflags);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __table2_unlock(table2, chkid);

        ANALYSIS_QUEUE(0, IO_WARN / 3, "__table2_chunk_sync");

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}

STATIC int __table2_chunk_getinfo(table2_t *table2, const chkid_t *chkid,
                              chkinfo_t *chkinfo)
{
        int ret;

        /* table2 lazy load maybe will update chunk info, so check table1 chunk first */
        ret = __table2_check(table2, chkid, table2->volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk__(table2, chkid, chkinfo, NULL);
        if (unlikely(ret))
                goto err_lock;

        __table2_unlock(table2, chkid);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}

STATIC int __table2_chunk_set(table2_t *table2, const chkid_t *chkid,
                              const nid_t *nid, int status, void *_volume_proto)
{
        int ret;
        volume_proto_t *volume_proto = _volume_proto;

        ANALYSIS_BEGIN(0);

        ret = __table2_check(table2, chkid, _volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        if (__S_CHECK == status) {
                ret = __table2_chunk_vfm_add__(table2, chkid, nid);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        } else {
                ret = __table2_chunk_set__(table2, chkid, nid, status, volume_proto);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }
#else
        ret = __table2_chunk_set__(table2, chkid, nid, status, volume_proto);
        if (unlikely(ret))
                GOTO(err_lock, ret);
#endif

        __table2_unlock(table2, chkid);
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_vfm_add(table2_t *table2, const chkid_t *chkid, const nid_t *nid, int count)
{
        int ret;
        volume_proto_t *volume_proto = table2->volume_proto;

        ANALYSIS_BEGIN(0);

        ret = __table2_check(table2, chkid, volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_vfm_add__(table2, chkid, nid, count);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __table2_unlock(table2, chkid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_chunk_localize__(table2_t *table2, const chkid_t *chkid,
                                     volume_proto_t *volume_proto)
{
        int ret, i;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkinfo[CHKINFO_MAX], _chkstat[CHKSTAT_MAX], _vfm[VFM_SIZE(VFM_COUNT_MAX)];
        nid_t nid[LICH_REPLICA_MAX];
        const fileinfo_t *fileinfo;
        vfm_t *vfm;
        uint64_t clock;
        
        YASSERT(chkid->type == __RAW_CHUNK__);

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __table2_chunk__(table2, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (net_islocal(&chkinfo->diskid[i].id)) {
                        ret = EEXIST;
                        GOTO(err_ret, ret);
                }

                nid[i] = chkinfo->diskid[i].id;
        }

        nid[0] = *net_getnid();

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vfm = (void *)_vfm;
        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        clock = vfm->clock;
        
        YASSERT(chkid->type == __RAW_CHUNK__);
        fileinfo = &volume_proto->table1.fileinfo;
        ret = volume_proto->chunk_ops.check(table2->table1->pool, chkinfo,
                                            chkstat, vfm, &volume_proto->chkid, 1,
                                            &volume_proto->lease.token, &fileinfo->ec, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (clock != vfm->clock) {
                ret = table2->vfm_set(table2, chkid, &clock, vfm, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = volume_proto->chunk_ops.move(table2->table1->pool, chkinfo, chkstat,
                                           NULL, &volume_proto->chkid, nid, chkinfo->repnum,
                                           &volume_proto->lease.token, &fileinfo->ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_localize(table2_t *table2, const chkid_t *chkid, void *_volume_proto)
{
        int ret;
        volume_proto_t *volume_proto = _volume_proto;

        ANALYSIS_BEGIN(0);
        ret = __table2_check(table2, chkid, _volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_ec_chunk_redo(table2, chkid, __OP_WRITE, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_localize__(table2, chkid, volume_proto);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __table2_unlock(table2, chkid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_chunk_move__(table2_t *table2, const chkid_t *chkid,
                                 const nid_t *nid, int count, volume_proto_t *volume_proto)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkinfo[CHKINFO_MAX], _chkstat[CHKSTAT_MAX], _vfm[VFM_SIZE(VFM_COUNT_MAX)];
        fileinfo_t *fileinfo;
        vfm_t *vfm;
        uint64_t clock;

        YASSERT(chkid->type == __RAW_CHUNK__);

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __table2_chunk__(table2, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vfm = (void *)_vfm;
        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        clock = vfm->clock;
        
        // check也会改变chkinfo，所以需要更新
        fileinfo = &volume_proto->table1.fileinfo;
        ret = volume_proto->chunk_ops.check(table2->table1->pool, chkinfo,
                                            chkstat, vfm, &volume_proto->chkid, 1,
                                            &volume_proto->lease.token, &fileinfo->ec, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (clock != vfm->clock) {
                ret = table2->vfm_set(table2, chkid, &clock, vfm, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        clock = vfm->clock;
        // 设定新的副本集
        ret = volume_proto->chunk_ops.move(table2->table1->pool, chkinfo, chkstat,
                                           vfm, &volume_proto->chkid, nid, count,
                                           &volume_proto->lease.token, &fileinfo->ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (clock != vfm->clock) {
                ret = table2->vfm_set(table2, chkid, &clock, vfm, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_move(table2_t *table2, const chkid_t *chkid,
                               const nid_t *nid, int count, void *_volume_proto)
{
        int ret;
        volume_proto_t *volume_proto = _volume_proto;

        ANALYSIS_BEGIN(0);

        ret = __table2_check(table2, chkid, _volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_ec_chunk_redo(table2, chkid, __OP_WRITE, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_move__(table2, chkid, nid, count, volume_proto);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __table2_unlock(table2, chkid);

        ANALYSIS_QUEUE(0, IO_WARN, "__table2_chunk_move");
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_chunk_stat(table2_t *table2, const fileid_t *fileid,
                const uint64_t *snap_rollback, filestat_t *stat, off_t off, size_t _count)
{
        size_t size;
        (void)table2;
        (void)fileid;
        (void)snap_rollback;

        ANALYSIS_BEGIN(0);

        stat->localized = 0;
        stat->sparse = 0;
        stat->rollback = 0;

        // TODO if max is too large
#if !ENABLE_LARGE_VOLUME
        int ret, chknum;
        chkid_t chkid;
        int i, loc, roll;

        chknum = size2chknum(table2->table1->fileinfo.size, &table2->table1->fileinfo.ec);
        size = _count == STOR_STAT_WHOLLY ? chknum : _count;
        off_t off2 = off + size;

        for (i = off2 - 1; i >= off; i--) {
                fid2cid(&chkid, fileid, i);

                ret = __table2_check(table2, &chkid, table2->volume_proto, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __table2_chunk_stat__(table2, &chkid, snap_rollback, &loc, &roll);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                stat->sparse++;
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                if (loc)
                        stat->localized++;
                if (roll)
                        stat->rollback++;
        }
#endif

        ANALYSIS_END(0, 1000 * 10, NULL);

        return 0;
#if !ENABLE_LARGE_VOLUME
err_ret:
        return ret;
#endif
}

STATIC int __table2_chunk_cleanup__(table2_t *table2, const chkid_t *chkid,
                                    const nid_t *nid, uint64_t meta_version)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];

        DBUG("cleanup "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __table2_chunk__(table2, chkid, chkinfo, chkstat);
        if (unlikely(ret)) {
                DBUG("cleanup "CHKID_FORMAT" @ "CHKID_FORMAT" fail\n",
                     CHKID_ARG(chkid), CHKID_ARG(&table2->table1->chkid));

                if (ret == ENOENT) {
                        md_proto_chunk_cleanup1(chkid, nid, meta_version);
                }

                GOTO(err_ret, ret);
        }

        ret = md_proto_chunk_cleanup(chkinfo, nid, meta_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_cleanup(table2_t *table2, const chkid_t *chkid,
                                  const nid_t *nid, uint64_t meta_version, void *_volume_proto)
{
        int ret;

        ANALYSIS_BEGIN(0);

        ret = __table2_check(table2, chkid, _volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_cleanup__(table2, chkid, nid, meta_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __table2_unlock(table2, chkid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return ret;
}

STATIC int __table2_chunk_iterator(table2_t *table2, const fileid_t *fileid,
                                   func2_t func2, void *_arg, uint64_t idx)
{
        int ret;
        chkid_t chkid, _fileid;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        //chknum = table2->chknum;

        //DBUG("file "CHKID_FORMAT" count %u\n",
        //CHKID_ARG(fileid), chknum);

        fid2cid(&chkid, fileid, idx);

        // TODO ESTALE
        ret = table2->chunk_check(table2, &chkid, __OP_READ, 0, NULL, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_ret;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        ret = __table2_rdlock(table2, &chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_getinfo__(table2, &chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_lock;
                } else
                        GOTO(err_lock, ret);
        }

        _fileid = *fileid;
        func2(_arg, &_fileid, chkinfo);

        __table2_unlock(table2, &chkid);

        return 0;
err_lock:
        __table2_unlock(table2, &chkid);
err_ret:
        return ret;
}

STATIC int __table2_chunk_snapshot_update__(table2_t *table2, const chkid_t *chkid,
                                            uint64_t snap_version, volume_proto_t *volume_proto)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ret = __table2_chunk__(table2, chkid, chkinfo, chkstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("old %jd new %jd\n", chkinfo->snap_version, snap_version);

        chkinfo->snap_version = snap_version;
        chkinfo->info_version++;

        //UNIMPLEMENTED(__WARN__);//is this safe???

        ret = __table2_update_item__(table2, chkinfo, chkstat, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_DUMP(chkinfo, D_INFO);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_chunk_snapshot_update(table2_t *table2, const chkid_t *chkid,
                                          uint64_t snap_version, void *_volume_proto)
{
        int ret;
        volume_proto_t *volume_proto = _volume_proto;

        ANALYSIS_BEGIN(0);

        ret = __table2_check(table2, chkid, _volume_proto, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_wrlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_chunk_snapshot_update__(table2, chkid, snap_version, volume_proto);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __table2_unlock(table2, chkid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_chunk_read(table2_t *table2, chunk_io_t *chunk_io)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        const chkid_t *chkid = &chunk_io->io.id;
        vclock_t *vclock = &chunk_io->io.vclock;
        char tmp[VFM_SIZE(VFM_COUNT_MAX)];
        volume_proto_t *volume_proto = table2->volume_proto;
        vfm_t *vfm;

        vfm = (void *)tmp;
        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT);
                GOTO(err_ret, ret);
        }

        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT);
                GOTO(err_lock, ret);
        }
        
        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        chunk_proto_clock(chkinfo, chkstat, &vclock->clock, __OP_READ);

        vclock->vfm = vfm->clock;
        
        ret = volume_proto->chunk_ops.read(chkinfo, chkstat, vfm, &chunk_io->io,
                                            &chunk_io->buf, NULL);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT);
                DBUG("chunk "CHKID_FORMAT" %s\n",
                     CHKID_ARG(chkid), strerror(ret));
                GOTO(err_lock, ret);
        }

        __table2_unlock(table2, chkid);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}

STATIC int __table2_chunk_readzero(table2_t *table2, chunk_io_t *chunk_io)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        const chkid_t *chkid = &chunk_io->io.id;

        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT);
                GOTO(err_ret, ret);
        }

        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        if (!(chunk_io->io.flags & __FILE_ATTR_NOFILL__)) {
                                DINFO(CHKID_FORMAT" read zero, offset %ju size %u\n",
                                      CHKID_ARG(&chunk_io->io.id), chunk_io->io.offset, chunk_io->io.size);
                                mbuffer_appendzero(&chunk_io->buf, chunk_io->io.size);
                                goto out;
                        } else
                                GOTO(err_lock, ret);
                } else
                        GOTO(err_lock, ret);
        } else {
                ret = EEXIST;
                GOTO(err_lock, ret);
        }

out:
        __table2_unlock(table2, chkid);

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}

STATIC int __table2_chunk_write(table2_t *table2, chunk_io_t *chunk_io)
{
        int ret;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        const chkid_t *chkid = &chunk_io->io.id;
        vclock_t *vclock = &chunk_io->io.vclock;
        char tmp[VFM_SIZE(VFM_COUNT_MAX)];
        volume_proto_t *volume_proto = table2->volume_proto;
        vfm_t *vfm;

        ANALYSIS_BEGIN(2);

        vfm = (void *)tmp;
        ret = __table2_rdlock(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_lock, ret);
        
        ret = __table2_chunk_getinfo__(table2, chkid, &chkinfo, &chkstat);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        chunk_proto_clock(chkinfo, chkstat, &vclock->clock, __OP_WRITE);

        vclock->vfm = vfm->clock;

        ret = volume_proto->chunk_ops.write(chkinfo, chkstat, vfm, &chunk_io->io,
                                             &chunk_io->buf, NULL);
        if (unlikely(ret)) {
                DBUG("chunk write "CHKID_FORMAT" (%d)%s\n",
                     CHKID_ARG(chkid), ret, strerror(ret));
                GOTO(err_lock, ret);
        }

        __table2_unlock(table2, chkid);

        ANALYSIS_QUEUE(2, IO_WARN, "__volume_proto_chunk_write_02");

        return 0;
err_lock:
        __table2_unlock(table2, chkid);
err_ret:
        return ret;
}


STATIC int __table2_vfm_set__(table2_t *table2, const chkid_t *tid, const uint64_t *prev, const vfm_t *_vfm)
{
        int ret, len;
        vfm_t *vfm;
        char buf[MAX_BUF_LEN];
        table_proto_t *table_proto;
        vfm_mem_t *vfm_mem;

        vfm_mem = &(table2->table1->vfm_array[tid->idx]);
        vfm = vfm_mem->vfm;
        table_proto = table2->table1->table_array[tid->idx];

        if (likely(prev)) {
                if (*prev != vfm->clock) {
                        ret = ESTALE;
                        GOTO(err_ret, ret);
                }

                if (_vfm->clock < vfm->clock) {
                        ret = EINVAL;
                        GOTO(err_ret, ret);
                }
        }

        base64_encode((void *)_vfm, VFM_SIZE(_vfm->count),  buf);
        len = strlen(buf) + 1;
        if (len > TABLE_PRORO_XATTR_ITEM_SIZE / 2) {
                ret = EIO;
                GOTO(err_ret, ret);
        }
        
        ret = table_proto->setinfo(table_proto, buf, len, TABLE_PROTO_INFO_VFM);
        if (unlikely(ret)) {
                DWARN("set "CHKID_FORMAT" vfm fail, ret:%d\n", CHKID_ARG(&table_proto->chkid), ret);
                GOTO(err_ret, ret);
        }
 
        ret = yrealloc((void **)&vfm, VFM_SIZE(vfm->count), VFM_SIZE(_vfm->count));
        if (ret)
                UNIMPLEMENTED(__DUMP__);

        memcpy(vfm, _vfm, VFM_SIZE(_vfm->count));
        table2->table1->vfm_array[tid->idx].vfm = vfm;

        vfm_dump(_vfm, buf);

        DINFO("set "CHKID_FORMAT" vfm %s\n",
              CHKID_ARG(&table_proto->chkid), buf);

        return 0;
err_ret:
        return ret;
}

STATIC int __table2_vfm_set(table2_t *table2, const chkid_t *chkid, const uint64_t *prev, const vfm_t *_vfm, int force)
{
        int ret;
        table1_t *table1 = table2->table1;
        chkid_t tid;
        vfm_mem_t *vfm_mem;
        vfm_t *vfm;

        ANALYSIS_BEGIN(0);

        YASSERT(chkid->type == __RAW_CHUNK__);
        cid2tid(&tid, chkid);

        table1->extend(table1, tid.idx);

        ret = __table2_wrlock_sub(table2, &tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) vfm;
        (void) vfm_mem;
#if 1
        vfm_mem = &(table2->table1->vfm_array[tid.idx]);
        vfm = vfm_mem->vfm;
        if (vfm_mem->lock && force == 0) {
                DWARN("subvol "CHKID_FORMAT" locked\n", CHKID_ARG(&tid));
                ret = EBUSY;
                GOTO(err_lock, ret);
        }
#endif
        
        ret = __table2_vfm_set__(table2, &tid, prev, _vfm);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        __table2_unlock_sub(table2, &tid);

#if 1
        if (_vfm->count) {
                recovery_wakeup_one_pool(table1->pool, "__table2_vfm_set");
        }
#endif
        
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return 0;
err_lock:
        __table2_unlock_sub(table2, &tid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_vfm_get(table2_t *table2, const chkid_t *chkid, vfm_t *_vfm)
{
        int ret;
        chkid_t tid;
        vfm_mem_t *vfm_mem;
        vfm_t *vfm;

        YASSERT(chkid->type == __RAW_CHUNK__);
        cid2tid(&tid, chkid);
                
        ret = __table2_vfm_load(table2, &tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table2_rdlock_sub(table2, &tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vfm_mem = &(table2->table1->vfm_array[tid.idx]);
        vfm = vfm_mem->vfm;
        if (vfm == NULL) {
                ret = ENOKEY;
                GOTO(err_lock, ret);
        }

#if 1
        if (vfm_mem->lock) {
                DWARN("subvol "CHKID_FORMAT" locked\n", CHKID_ARG(&tid));
                ret = EBUSY;
                GOTO(err_lock, ret);
        }
#endif

        memcpy(_vfm, vfm, VFM_SIZE(vfm->count));

        __table2_unlock_sub(table2, &tid);

        return 0;
err_lock:
        __table2_unlock_sub(table2, &tid);
err_ret:
        return ret;
}

STATIC int __table2_vfm_lock(table2_t *table2, const chkid_t *chkid)
{
        int ret;
        table1_t *table1 = table2->table1;
        chkid_t tid;
        vfm_mem_t *vfm_mem;

#if 0
        return 0;
#endif
        ANALYSIS_BEGIN(0);

        cid2tid(&tid, chkid);

        table1->extend(table1, tid.idx);

        ret = __table2_wrlock_sub(table2, &tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vfm_mem = &(table2->table1->vfm_array[tid.idx]);
        if (vfm_mem->lock) {
                DWARN("subvol "CHKID_FORMAT" locked\n", CHKID_ARG(&tid));
                ret = EBUSY;
                GOTO(err_lock, ret);
        }

        vfm_mem->lock = 1;

        __table2_unlock_sub(table2, &tid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_lock:
        __table2_unlock_sub(table2, &tid);
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        return ret;
}

STATIC int __table2_vfm_unlock(table2_t *table2, const chkid_t *chkid)
{
        int ret;
        table1_t *table1 = table2->table1;
        chkid_t tid;
        vfm_mem_t *vfm_mem;

#if 0
        return 0;
#endif
        ANALYSIS_BEGIN(0);

        cid2tid(&tid, chkid);

        table1->extend(table1, tid.idx);

        ret = __table2_wrlock_sub(table2, &tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vfm_mem = &(table2->table1->vfm_array[tid.idx]);
        YASSERT(vfm_mem->lock);
        vfm_mem->lock = 0;

        __table2_unlock_sub(table2, &tid);

        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);

        return 0;
err_ret:
        ANALYSIS_END(0, 1000 * 1000 * 5, NULL);
        
        return ret;
}

int table2_init(table2_t *table2, table1_t *table1)
{
        int ret;
        char *pname = NULL;
#if LOCK_DEBUG
        char lname[MAX_LOCK_NAME];
#endif

        memset(table2, 0x0, sizeof(*table2));

#if LOCK_DEBUG
        pname = lname;
        sprintf(pname, "table2.rwlock."CHKID_FORMAT, CHKID_ARG(&table1->chkid));
#endif
        ret = plock_init(&table2->rwlock, pname);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if LOCK_DEBUG
        pname = lname;
        sprintf(pname, "table2.load."CHKID_FORMAT, CHKID_ARG(&table1->chkid));
#endif

        ret = ltable_init(&table2->rwlock_sub, "rwlock_sub");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ltable_init(&table2->rwlock_table, "rwlock_raw");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ltable_init(&table2->eclock_table, "table2");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table2->chunk_create = __table2_chunk_create;
        table2->chunk_createwith = __table2_chunk_createwith;
        table2->chunk_discard = __table2_chunk_discard;
        table2->chunk_batch_discard = __table2_chunk_batch_discard;
        table2->chunk_cleanup = __table2_chunk_cleanup;

        table2->chunk_exist = __table2_chunk_exist;
        table2->chunk_getinfo = __table2_chunk_getinfo;
        table2->chunk_stat = __table2_chunk_stat;
        table2->chunk_iterator = __table2_chunk_iterator;
        table2->chunk_unintact = __table2_chunk_unintact;

        table2->extend = __table2_extend;
        table2->chunk_set = __table2_chunk_set;

        table2->chunk_check = __table2_chunk_check;
        table2->chunk_sync = __table2_chunk_sync;
        table2->chunk_move = __table2_chunk_move;

        table2->chunk_localize = __table2_chunk_localize;
        table2->chunk_readzero = __table2_chunk_readzero;
        table2->chunk_read = __table2_chunk_read;
        table2->chunk_write = __table2_chunk_write;

        //table2->chunk_intact = __table2_chunk_intact;
        table2->chunk_snapshot_update = __table2_chunk_snapshot_update;

        table2->pre_io = __table2_pre_io;
        table2->reset = __table2_reset;
        table2->post_io = __table2_post_io;

        table2->vfm_lock = __table2_vfm_lock;
        table2->vfm_unlock = __table2_vfm_unlock;

        table2->vfm_set = __table2_vfm_set;
        table2->vfm_get = __table2_vfm_get;
        table2->vfm_add = __table2_vfm_add;

        table2->vfm_cleanup = __table2_vfm_cleanup;
        table2->vfm_set_dangerously = __table2_vfm_set_dangerously;

        table2->load_bmap = __table2_load_bmap;

        table2->table1 = table1;

        return 0;
err_ret:
        return ret;
}
